Skip to main content

drasi_source_postgres/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! PostgreSQL Replication Source Plugin for Drasi
16//!
17//! This plugin captures data changes from PostgreSQL databases using logical replication.
18//! It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL)
19//! messages in real-time, converting them to Drasi source change events.
20//!
21//! # Prerequisites
22//!
23//! Before using this source, you must configure PostgreSQL for logical replication:
24//!
25//! 1. **Enable logical replication** in `postgresql.conf`:
26//!    ```text
27//!    wal_level = logical
28//!    max_replication_slots = 10
29//!    max_wal_senders = 10
30//!    ```
31//!
32//! 2. **Create a publication** for the tables you want to monitor:
33//!    ```sql
34//!    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
35//!    ```
36//!
37//! 3. **Create a replication slot** (optional - the source can create one automatically):
38//!    ```sql
39//!    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
40//!    ```
41//!
42//! 4. **Grant replication permissions** to the database user:
43//!    ```sql
44//!    ALTER ROLE drasi_user REPLICATION;
45//!    GRANT SELECT ON TABLE users, orders TO drasi_user;
46//!    ```
47//!
48//! # Architecture
49//!
50//! The source has two main components:
51//!
52//! - **Bootstrap Handler**: Performs an initial snapshot of table data when a query
53//!   subscribes with bootstrap enabled. Uses the replication slot's snapshot LSN to
54//!   ensure consistency.
55//!
56//! - **Streaming Handler**: Continuously reads WAL messages and decodes them using
57//!   the `pgoutput` protocol. Handles INSERT, UPDATE, and DELETE operations.
58//!
59//! # Configuration
60//!
61//! | Field | Type | Default | Description |
62//! |-------|------|---------|-------------|
63//! | `host` | string | `"localhost"` | PostgreSQL host |
64//! | `port` | u16 | `5432` | PostgreSQL port |
65//! | `database` | string | *required* | Database name |
66//! | `user` | string | *required* | Database user (must have replication permission) |
67//! | `password` | string | `""` | Database password |
68//! | `tables` | string[] | `[]` | Tables to replicate |
69//! | `slot_name` | string | `"drasi_slot"` | Replication slot name |
70//! | `publication_name` | string | `"drasi_publication"` | Publication name |
71//! | `ssl_mode` | string | `"prefer"` | SSL mode: disable, prefer, require |
72//! | `table_keys` | TableKeyConfig[] | `[]` | Primary key configuration for tables |
73//!
74//! # Example Configuration (YAML)
75//!
76//! ```yaml
77//! source_type: postgres
78//! properties:
79//!   host: db.example.com
80//!   port: 5432
81//!   database: production
82//!   user: replication_user
83//!   password: secret
84//!   tables:
85//!     - users
86//!     - orders
87//!   slot_name: drasi_slot
88//!   publication_name: drasi_publication
89//!   table_keys:
90//!     - table: users
91//!       key_columns: [id]
92//!     - table: orders
93//!       key_columns: [order_id]
94//! ```
95//!
96//! # Data Format
97//!
98//! The PostgreSQL source decodes WAL messages and converts them to Drasi source changes.
99//! Each row change is mapped as follows:
100//!
101//! ## Node Mapping
102//!
103//! - **Element ID**: `{schema}:{table}:{primary_key_value}` (e.g., `public:users:123`)
104//! - **Labels**: `[{table_name}]` (e.g., `["users"]`)
105//! - **Properties**: All columns from the row (column names become property keys)
106//!
107//! ## WAL Message to SourceChange
108//!
109//! | WAL Operation | SourceChange |
110//! |---------------|--------------|
111//! | INSERT | `SourceChange::Insert { element: Node }` |
112//! | UPDATE | `SourceChange::Update { element: Node }` |
113//! | DELETE | `SourceChange::Delete { metadata }` |
114//!
115//! ## Example Mapping
116//!
117//! Given a PostgreSQL table:
118//!
119//! ```sql
120//! CREATE TABLE users (
121//!     id SERIAL PRIMARY KEY,
122//!     name VARCHAR(100),
123//!     email VARCHAR(255),
124//!     age INTEGER
125//! );
126//!
127//! INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
128//! ```
129//!
130//! Produces a SourceChange equivalent to:
131//!
132//! ```json
133//! {
134//!     "type": "Insert",
135//!     "element": {
136//!         "metadata": {
137//!             "element_id": "public:users:1",
138//!             "source_id": "pg-source",
139//!             "labels": ["users"],
140//!             "effective_from": 1699900000000000
141//!         },
142//!         "properties": {
143//!             "id": 1,
144//!             "name": "Alice",
145//!             "email": "alice@example.com",
146//!             "age": 30
147//!         }
148//!     }
149//! }
150//! ```
151//!
152//! # Usage Example
153//!
154//! ```rust,ignore
155//! use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
156//! use std::sync::Arc;
157//!
158//! let config = PostgresSourceBuilder::new()
159//!     .with_host("db.example.com")
160//!     .with_database("production")
161//!     .with_user("replication_user")
162//!     .with_password("secret")
163//!     .with_tables(vec!["users".to_string(), "orders".to_string()])
164//!     .build();
165//!
166//! let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
167//! drasi.add_source(source).await?;
168//! ```
169
170pub mod config;
171pub mod connection;
172pub mod decoder;
173pub mod protocol;
174pub mod scram;
175pub mod stream;
176pub mod types;
177
178pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
179
180use anyhow::Result;
181use async_trait::async_trait;
182use log::{error, info};
183use std::collections::HashMap;
184use std::sync::Arc;
185use tokio::sync::RwLock;
186
187use drasi_lib::channels::{DispatchMode, *};
188use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
189use drasi_lib::Source;
190use tracing::Instrument;
191
192/// PostgreSQL replication source that captures changes via logical replication.
193///
194/// This source connects to PostgreSQL using the replication protocol and decodes
195/// WAL messages in real-time, converting them to Drasi source change events.
196///
197/// # Fields
198///
199/// - `base`: Common source functionality (dispatchers, status, lifecycle)
200/// - `config`: PostgreSQL connection and replication configuration
201pub struct PostgresReplicationSource {
202    /// Base source implementation providing common functionality
203    base: SourceBase,
204    /// PostgreSQL source configuration
205    config: PostgresSourceConfig,
206}
207
208impl PostgresReplicationSource {
209    /// Create a builder for PostgresReplicationSource
210    ///
211    /// # Example
212    ///
213    /// ```rust,ignore
214    /// use drasi_source_postgres::PostgresReplicationSource;
215    ///
216    /// let source = PostgresReplicationSource::builder("pg-source")
217    ///     .with_host("db.example.com")
218    ///     .with_database("production")
219    ///     .with_user("replication_user")
220    ///     .with_password("secret")
221    ///     .with_tables(vec!["users".to_string(), "orders".to_string()])
222    ///     .with_bootstrap_provider(my_provider)
223    ///     .build()?;
224    /// ```
225    pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
226        PostgresSourceBuilder::new(id)
227    }
228
229    /// Create a new PostgreSQL replication source.
230    ///
231    /// The event channel is automatically injected when the source is added
232    /// to DrasiLib via `add_source()`.
233    ///
234    /// # Arguments
235    ///
236    /// * `id` - Unique identifier for this source instance
237    /// * `config` - PostgreSQL source configuration
238    ///
239    /// # Returns
240    ///
241    /// A new `PostgresReplicationSource` instance, or an error if construction fails.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the base source cannot be initialized.
246    ///
247    /// # Example
248    ///
249    /// ```rust,ignore
250    /// use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
251    ///
252    /// let config = PostgresSourceBuilder::new()
253    ///     .with_host("db.example.com")
254    ///     .with_database("mydb")
255    ///     .with_user("replication_user")
256    ///     .build();
257    ///
258    /// let source = PostgresReplicationSource::new("my-pg-source", config)?;
259    /// ```
260    pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
261        let id = id.into();
262        let params = SourceBaseParams::new(id);
263        Ok(Self {
264            base: SourceBase::new(params)?,
265            config,
266        })
267    }
268
269    /// Create a new PostgreSQL replication source with custom dispatch settings
270    ///
271    /// The event channel is automatically injected when the source is added
272    /// to DrasiLib via `add_source()`.
273    pub fn with_dispatch(
274        id: impl Into<String>,
275        config: PostgresSourceConfig,
276        dispatch_mode: Option<DispatchMode>,
277        dispatch_buffer_capacity: Option<usize>,
278    ) -> Result<Self> {
279        let id = id.into();
280        let mut params = SourceBaseParams::new(id);
281        if let Some(mode) = dispatch_mode {
282            params = params.with_dispatch_mode(mode);
283        }
284        if let Some(capacity) = dispatch_buffer_capacity {
285            params = params.with_dispatch_buffer_capacity(capacity);
286        }
287        Ok(Self {
288            base: SourceBase::new(params)?,
289            config,
290        })
291    }
292}
293
294#[async_trait]
295impl Source for PostgresReplicationSource {
296    fn id(&self) -> &str {
297        &self.base.id
298    }
299
300    fn type_name(&self) -> &str {
301        "postgres"
302    }
303
304    fn properties(&self) -> HashMap<String, serde_json::Value> {
305        let mut props = HashMap::new();
306        props.insert(
307            "host".to_string(),
308            serde_json::Value::String(self.config.host.clone()),
309        );
310        props.insert(
311            "port".to_string(),
312            serde_json::Value::Number(self.config.port.into()),
313        );
314        props.insert(
315            "database".to_string(),
316            serde_json::Value::String(self.config.database.clone()),
317        );
318        props.insert(
319            "user".to_string(),
320            serde_json::Value::String(self.config.user.clone()),
321        );
322        // Don't expose password in properties
323        props.insert(
324            "tables".to_string(),
325            serde_json::Value::Array(
326                self.config
327                    .tables
328                    .iter()
329                    .map(|t| serde_json::Value::String(t.clone()))
330                    .collect(),
331            ),
332        );
333        props
334    }
335
336    fn auto_start(&self) -> bool {
337        self.base.get_auto_start()
338    }
339
340    async fn start(&self) -> Result<()> {
341        if self.base.get_status().await == ComponentStatus::Running {
342            return Ok(());
343        }
344
345        self.base.set_status(ComponentStatus::Starting).await;
346        info!("Starting PostgreSQL replication source: {}", self.base.id);
347
348        let config = self.config.clone();
349        let source_id = self.base.id.clone();
350        let dispatchers = self.base.dispatchers.clone();
351        let status_tx = self.base.status_tx();
352        let status_clone = self.base.status.clone();
353
354        // Get instance_id from context for log routing isolation
355        let instance_id = self
356            .base
357            .context()
358            .await
359            .map(|c| c.instance_id)
360            .unwrap_or_default();
361
362        // Create span for spawned task so log::info!, log::error! etc are routed
363        let source_id_for_span = source_id.clone();
364        let span = tracing::info_span!(
365            "postgres_replication_task",
366            instance_id = %instance_id,
367            component_id = %source_id_for_span,
368            component_type = "source"
369        );
370
371        let task = tokio::spawn(
372            async move {
373                if let Err(e) = run_replication(
374                    source_id.clone(),
375                    config,
376                    dispatchers,
377                    status_tx.clone(),
378                    status_clone.clone(),
379                )
380                .await
381                {
382                    error!("Replication task failed for {source_id}: {e}");
383                    *status_clone.write().await = ComponentStatus::Error;
384                    if let Some(ref tx) = *status_tx.read().await {
385                        let _ = tx
386                            .send(ComponentEvent {
387                                component_id: source_id,
388                                component_type: ComponentType::Source,
389                                status: ComponentStatus::Error,
390                                timestamp: chrono::Utc::now(),
391                                message: Some(format!("Replication failed: {e}")),
392                            })
393                            .await;
394                    }
395                }
396            }
397            .instrument(span),
398        );
399
400        *self.base.task_handle.write().await = Some(task);
401        self.base.set_status(ComponentStatus::Running).await;
402
403        self.base
404            .send_component_event(
405                ComponentStatus::Running,
406                Some("PostgreSQL replication started".to_string()),
407            )
408            .await?;
409
410        Ok(())
411    }
412
413    async fn stop(&self) -> Result<()> {
414        if self.base.get_status().await != ComponentStatus::Running {
415            return Ok(());
416        }
417
418        info!("Stopping PostgreSQL replication source: {}", self.base.id);
419
420        self.base.set_status(ComponentStatus::Stopping).await;
421
422        // Cancel the replication task
423        if let Some(task) = self.base.task_handle.write().await.take() {
424            task.abort();
425        }
426
427        self.base.set_status(ComponentStatus::Stopped).await;
428        self.base
429            .send_component_event(
430                ComponentStatus::Stopped,
431                Some("PostgreSQL replication stopped".to_string()),
432            )
433            .await?;
434
435        Ok(())
436    }
437
438    async fn status(&self) -> ComponentStatus {
439        self.base.get_status().await
440    }
441
442    async fn subscribe(
443        &self,
444        settings: drasi_lib::config::SourceSubscriptionSettings,
445    ) -> Result<SubscriptionResponse> {
446        self.base
447            .subscribe_with_bootstrap(&settings, "PostgreSQL")
448            .await
449    }
450
451    fn as_any(&self) -> &dyn std::any::Any {
452        self
453    }
454
455    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
456        self.base.initialize(context).await;
457    }
458
459    async fn set_bootstrap_provider(
460        &self,
461        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
462    ) {
463        self.base.set_bootstrap_provider(provider).await;
464    }
465}
466
467async fn run_replication(
468    source_id: String,
469    config: PostgresSourceConfig,
470    dispatchers: Arc<
471        RwLock<
472            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
473        >,
474    >,
475    status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
476    status: Arc<RwLock<ComponentStatus>>,
477) -> Result<()> {
478    info!("Starting replication for source {source_id}");
479
480    let mut stream =
481        stream::ReplicationStream::new(config, source_id, dispatchers, status_tx, status);
482
483    stream.run().await
484}
485
486/// Builder for PostgreSQL source configuration.
487///
488/// Provides a fluent API for constructing PostgreSQL source configurations
489/// with sensible defaults.
490///
491/// # Example
492///
493/// ```rust,ignore
494/// use drasi_source_postgres::PostgresReplicationSource;
495///
496/// let source = PostgresReplicationSource::builder("pg-source")
497///     .with_host("db.example.com")
498///     .with_database("production")
499///     .with_user("replication_user")
500///     .with_password("secret")
501///     .with_tables(vec!["users".to_string(), "orders".to_string()])
502///     .with_slot_name("my_slot")
503///     .build()?;
504/// ```
505pub struct PostgresSourceBuilder {
506    id: String,
507    host: String,
508    port: u16,
509    database: String,
510    user: String,
511    password: String,
512    tables: Vec<String>,
513    slot_name: String,
514    publication_name: String,
515    ssl_mode: SslMode,
516    table_keys: Vec<TableKeyConfig>,
517    dispatch_mode: Option<DispatchMode>,
518    dispatch_buffer_capacity: Option<usize>,
519    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
520    auto_start: bool,
521}
522
523impl PostgresSourceBuilder {
524    /// Create a new PostgreSQL source builder with the given ID and default values
525    pub fn new(id: impl Into<String>) -> Self {
526        Self {
527            id: id.into(),
528            host: "localhost".to_string(),
529            port: 5432,
530            database: String::new(),
531            user: String::new(),
532            password: String::new(),
533            tables: Vec::new(),
534            slot_name: "drasi_slot".to_string(),
535            publication_name: "drasi_publication".to_string(),
536            ssl_mode: SslMode::default(),
537            table_keys: Vec::new(),
538            dispatch_mode: None,
539            dispatch_buffer_capacity: None,
540            bootstrap_provider: None,
541            auto_start: true,
542        }
543    }
544
545    /// Set the PostgreSQL host
546    pub fn with_host(mut self, host: impl Into<String>) -> Self {
547        self.host = host.into();
548        self
549    }
550
551    /// Set the PostgreSQL port
552    pub fn with_port(mut self, port: u16) -> Self {
553        self.port = port;
554        self
555    }
556
557    /// Set the database name
558    pub fn with_database(mut self, database: impl Into<String>) -> Self {
559        self.database = database.into();
560        self
561    }
562
563    /// Set the database user
564    pub fn with_user(mut self, user: impl Into<String>) -> Self {
565        self.user = user.into();
566        self
567    }
568
569    /// Set the database password
570    pub fn with_password(mut self, password: impl Into<String>) -> Self {
571        self.password = password.into();
572        self
573    }
574
575    /// Set the tables to replicate
576    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
577        self.tables = tables;
578        self
579    }
580
581    /// Add a table to replicate
582    pub fn add_table(mut self, table: impl Into<String>) -> Self {
583        self.tables.push(table.into());
584        self
585    }
586
587    /// Set the replication slot name
588    pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
589        self.slot_name = slot_name.into();
590        self
591    }
592
593    /// Set the publication name
594    pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
595        self.publication_name = publication_name.into();
596        self
597    }
598
599    /// Set the SSL mode
600    pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
601        self.ssl_mode = ssl_mode;
602        self
603    }
604
605    /// Set the table key configurations
606    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
607        self.table_keys = table_keys;
608        self
609    }
610
611    /// Add a table key configuration
612    pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
613        self.table_keys.push(table_key);
614        self
615    }
616
617    /// Set the dispatch mode for this source
618    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
619        self.dispatch_mode = Some(mode);
620        self
621    }
622
623    /// Set the dispatch buffer capacity for this source
624    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
625        self.dispatch_buffer_capacity = Some(capacity);
626        self
627    }
628
629    /// Set the bootstrap provider for this source
630    pub fn with_bootstrap_provider(
631        mut self,
632        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
633    ) -> Self {
634        self.bootstrap_provider = Some(Box::new(provider));
635        self
636    }
637
638    /// Set whether this source should auto-start when DrasiLib starts.
639    ///
640    /// Default is `true`. Set to `false` if this source should only be
641    /// started manually via `start_source()`.
642    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
643        self.auto_start = auto_start;
644        self
645    }
646
647    /// Set the full configuration at once
648    pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
649        self.host = config.host;
650        self.port = config.port;
651        self.database = config.database;
652        self.user = config.user;
653        self.password = config.password;
654        self.tables = config.tables;
655        self.slot_name = config.slot_name;
656        self.publication_name = config.publication_name;
657        self.ssl_mode = config.ssl_mode;
658        self.table_keys = config.table_keys;
659        self
660    }
661
662    /// Build the PostgreSQL replication source
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the source cannot be constructed.
667    pub fn build(self) -> Result<PostgresReplicationSource> {
668        let config = PostgresSourceConfig {
669            host: self.host,
670            port: self.port,
671            database: self.database,
672            user: self.user,
673            password: self.password,
674            tables: self.tables,
675            slot_name: self.slot_name,
676            publication_name: self.publication_name,
677            ssl_mode: self.ssl_mode,
678            table_keys: self.table_keys,
679        };
680
681        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
682        if let Some(mode) = self.dispatch_mode {
683            params = params.with_dispatch_mode(mode);
684        }
685        if let Some(capacity) = self.dispatch_buffer_capacity {
686            params = params.with_dispatch_buffer_capacity(capacity);
687        }
688        if let Some(provider) = self.bootstrap_provider {
689            params = params.with_bootstrap_provider(provider);
690        }
691
692        Ok(PostgresReplicationSource {
693            base: SourceBase::new(params)?,
694            config,
695        })
696    }
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702
703    mod construction {
704        use super::*;
705
706        #[test]
707        fn test_builder_with_valid_config() {
708            let source = PostgresSourceBuilder::new("test-source")
709                .with_database("testdb")
710                .with_user("testuser")
711                .build();
712            assert!(source.is_ok());
713        }
714
715        #[test]
716        fn test_builder_with_custom_config() {
717            let source = PostgresSourceBuilder::new("pg-source")
718                .with_host("192.168.1.100")
719                .with_port(5433)
720                .with_database("production")
721                .with_user("admin")
722                .with_password("secret")
723                .build()
724                .unwrap();
725            assert_eq!(source.id(), "pg-source");
726        }
727
728        #[test]
729        fn test_with_dispatch_creates_source() {
730            let config = PostgresSourceConfig {
731                host: "localhost".to_string(),
732                port: 5432,
733                database: "testdb".to_string(),
734                user: "testuser".to_string(),
735                password: String::new(),
736                tables: Vec::new(),
737                slot_name: "drasi_slot".to_string(),
738                publication_name: "drasi_publication".to_string(),
739                ssl_mode: SslMode::default(),
740                table_keys: Vec::new(),
741            };
742            let source = PostgresReplicationSource::with_dispatch(
743                "dispatch-source",
744                config,
745                Some(DispatchMode::Channel),
746                Some(2000),
747            );
748            assert!(source.is_ok());
749            assert_eq!(source.unwrap().id(), "dispatch-source");
750        }
751    }
752
753    mod properties {
754        use super::*;
755
756        #[test]
757        fn test_id_returns_correct_value() {
758            let source = PostgresSourceBuilder::new("my-pg-source")
759                .with_database("db")
760                .with_user("user")
761                .build()
762                .unwrap();
763            assert_eq!(source.id(), "my-pg-source");
764        }
765
766        #[test]
767        fn test_type_name_returns_postgres() {
768            let source = PostgresSourceBuilder::new("test")
769                .with_database("db")
770                .with_user("user")
771                .build()
772                .unwrap();
773            assert_eq!(source.type_name(), "postgres");
774        }
775
776        #[test]
777        fn test_properties_contains_connection_info() {
778            let source = PostgresSourceBuilder::new("test")
779                .with_host("db.example.com")
780                .with_port(5433)
781                .with_database("mydb")
782                .with_user("app_user")
783                .with_password("secret")
784                .with_tables(vec!["users".to_string()])
785                .build()
786                .unwrap();
787            let props = source.properties();
788
789            assert_eq!(
790                props.get("host"),
791                Some(&serde_json::Value::String("db.example.com".to_string()))
792            );
793            assert_eq!(
794                props.get("port"),
795                Some(&serde_json::Value::Number(5433.into()))
796            );
797            assert_eq!(
798                props.get("database"),
799                Some(&serde_json::Value::String("mydb".to_string()))
800            );
801            assert_eq!(
802                props.get("user"),
803                Some(&serde_json::Value::String("app_user".to_string()))
804            );
805        }
806
807        #[test]
808        fn test_properties_does_not_expose_password() {
809            let source = PostgresSourceBuilder::new("test")
810                .with_database("db")
811                .with_user("user")
812                .with_password("super_secret_password")
813                .build()
814                .unwrap();
815            let props = source.properties();
816
817            // Password should not be exposed in properties
818            assert!(!props.contains_key("password"));
819        }
820
821        #[test]
822        fn test_properties_includes_tables() {
823            let source = PostgresSourceBuilder::new("test")
824                .with_database("db")
825                .with_user("user")
826                .with_tables(vec!["users".to_string(), "orders".to_string()])
827                .build()
828                .unwrap();
829            let props = source.properties();
830
831            let tables = props.get("tables").unwrap().as_array().unwrap();
832            assert_eq!(tables.len(), 2);
833            assert_eq!(tables[0], "users");
834            assert_eq!(tables[1], "orders");
835        }
836    }
837
838    mod lifecycle {
839        use super::*;
840
841        #[tokio::test]
842        async fn test_initial_status_is_stopped() {
843            let source = PostgresSourceBuilder::new("test")
844                .with_database("db")
845                .with_user("user")
846                .build()
847                .unwrap();
848            assert_eq!(source.status().await, ComponentStatus::Stopped);
849        }
850    }
851
852    mod builder {
853        use super::*;
854
855        #[test]
856        fn test_postgres_builder_defaults() {
857            let source = PostgresSourceBuilder::new("test").build().unwrap();
858            assert_eq!(source.config.host, "localhost");
859            assert_eq!(source.config.port, 5432);
860            assert_eq!(source.config.slot_name, "drasi_slot");
861            assert_eq!(source.config.publication_name, "drasi_publication");
862        }
863
864        #[test]
865        fn test_postgres_builder_custom_values() {
866            let source = PostgresSourceBuilder::new("test")
867                .with_host("db.example.com")
868                .with_port(5433)
869                .with_database("production")
870                .with_user("app_user")
871                .with_password("secret")
872                .with_tables(vec!["users".to_string(), "orders".to_string()])
873                .build()
874                .unwrap();
875
876            assert_eq!(source.config.host, "db.example.com");
877            assert_eq!(source.config.port, 5433);
878            assert_eq!(source.config.database, "production");
879            assert_eq!(source.config.user, "app_user");
880            assert_eq!(source.config.password, "secret");
881            assert_eq!(source.config.tables.len(), 2);
882            assert_eq!(source.config.tables[0], "users");
883            assert_eq!(source.config.tables[1], "orders");
884        }
885
886        #[test]
887        fn test_builder_add_table() {
888            let source = PostgresSourceBuilder::new("test")
889                .add_table("table1")
890                .add_table("table2")
891                .add_table("table3")
892                .build()
893                .unwrap();
894
895            assert_eq!(source.config.tables.len(), 3);
896            assert_eq!(source.config.tables[0], "table1");
897            assert_eq!(source.config.tables[1], "table2");
898            assert_eq!(source.config.tables[2], "table3");
899        }
900
901        #[test]
902        fn test_builder_slot_and_publication() {
903            let source = PostgresSourceBuilder::new("test")
904                .with_slot_name("custom_slot")
905                .with_publication_name("custom_pub")
906                .build()
907                .unwrap();
908
909            assert_eq!(source.config.slot_name, "custom_slot");
910            assert_eq!(source.config.publication_name, "custom_pub");
911        }
912
913        #[test]
914        fn test_builder_id() {
915            let source = PostgresReplicationSource::builder("my-pg-source")
916                .with_database("db")
917                .with_user("user")
918                .build()
919                .unwrap();
920
921            assert_eq!(source.base.id, "my-pg-source");
922        }
923    }
924
925    mod config {
926        use super::*;
927
928        #[test]
929        fn test_config_serialization() {
930            let config = PostgresSourceConfig {
931                host: "localhost".to_string(),
932                port: 5432,
933                database: "testdb".to_string(),
934                user: "testuser".to_string(),
935                password: String::new(),
936                tables: Vec::new(),
937                slot_name: "drasi_slot".to_string(),
938                publication_name: "drasi_publication".to_string(),
939                ssl_mode: SslMode::default(),
940                table_keys: Vec::new(),
941            };
942
943            let json = serde_json::to_string(&config).unwrap();
944            let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
945
946            assert_eq!(config, deserialized);
947        }
948
949        #[test]
950        fn test_config_deserialization_with_required_fields() {
951            let json = r#"{
952                "database": "mydb",
953                "user": "myuser"
954            }"#;
955            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
956
957            assert_eq!(config.database, "mydb");
958            assert_eq!(config.user, "myuser");
959            assert_eq!(config.host, "localhost"); // default
960            assert_eq!(config.port, 5432); // default
961            assert_eq!(config.slot_name, "drasi_slot"); // default
962        }
963
964        #[test]
965        fn test_config_deserialization_full() {
966            let json = r#"{
967                "host": "db.prod.internal",
968                "port": 5433,
969                "database": "production",
970                "user": "replication_user",
971                "password": "secret",
972                "tables": ["accounts", "transactions"],
973                "slot_name": "prod_slot",
974                "publication_name": "prod_publication"
975            }"#;
976            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
977
978            assert_eq!(config.host, "db.prod.internal");
979            assert_eq!(config.port, 5433);
980            assert_eq!(config.database, "production");
981            assert_eq!(config.user, "replication_user");
982            assert_eq!(config.password, "secret");
983            assert_eq!(config.tables, vec!["accounts", "transactions"]);
984            assert_eq!(config.slot_name, "prod_slot");
985            assert_eq!(config.publication_name, "prod_publication");
986        }
987    }
988}