Skip to main content

drasi_source_postgres/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PostgreSQL Replication Source Plugin for Drasi
17//!
18//! This plugin captures data changes from PostgreSQL databases using logical replication.
19//! It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL)
20//! messages in real-time, converting them to Drasi source change events.
21//!
22//! # Prerequisites
23//!
24//! Before using this source, you must configure PostgreSQL for logical replication:
25//!
26//! 1. **Enable logical replication** in `postgresql.conf`:
27//!    ```text
28//!    wal_level = logical
29//!    max_replication_slots = 10
30//!    max_wal_senders = 10
31//!    ```
32//!
33//! 2. **Create a publication** for the tables you want to monitor:
34//!    ```sql
35//!    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
36//!    ```
37//!
38//! 3. **Create a replication slot** (optional - the source can create one automatically):
39//!    ```sql
40//!    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
41//!    ```
42//!
43//! 4. **Grant replication permissions** to the database user:
44//!    ```sql
45//!    ALTER ROLE drasi_user REPLICATION;
46//!    GRANT SELECT ON TABLE users, orders TO drasi_user;
47//!    ```
48//!
49//! # Architecture
50//!
51//! The source has two main components:
52//!
53//! - **Bootstrap Handler**: Performs an initial snapshot of table data when a query
54//!   subscribes with bootstrap enabled. Uses the replication slot's snapshot LSN to
55//!   ensure consistency.
56//!
57//! - **Streaming Handler**: Continuously reads WAL messages and decodes them using
58//!   the `pgoutput` protocol. Handles INSERT, UPDATE, and DELETE operations.
59//!
60//! # Configuration
61//!
62//! | Field | Type | Default | Description |
63//! |-------|------|---------|-------------|
64//! | `host` | string | `"localhost"` | PostgreSQL host |
65//! | `port` | u16 | `5432` | PostgreSQL port |
66//! | `database` | string | *required* | Database name |
67//! | `user` | string | *required* | Database user (must have replication permission) |
68//! | `password` | string | `""` | Database password |
69//! | `tables` | string[] | `[]` | Tables to replicate |
70//! | `slot_name` | string | `"drasi_slot"` | Replication slot name |
71//! | `publication_name` | string | `"drasi_publication"` | Publication name |
72//! | `ssl_mode` | string | `"prefer"` | SSL mode: disable, prefer, require |
73//! | `table_keys` | TableKeyConfig[] | `[]` | Primary key configuration for tables |
74//!
75//! # Example Configuration (YAML)
76//!
77//! ```yaml
78//! source_type: postgres
79//! properties:
80//!   host: db.example.com
81//!   port: 5432
82//!   database: production
83//!   user: replication_user
84//!   password: secret
85//!   tables:
86//!     - users
87//!     - orders
88//!   slot_name: drasi_slot
89//!   publication_name: drasi_publication
90//!   table_keys:
91//!     - table: users
92//!       key_columns: [id]
93//!     - table: orders
94//!       key_columns: [order_id]
95//! ```
96//!
97//! # Data Format
98//!
99//! The PostgreSQL source decodes WAL messages and converts them to Drasi source changes.
100//! Each row change is mapped as follows:
101//!
102//! ## Node Mapping
103//!
104//! - **Element ID**: `{schema}:{table}:{primary_key_value}` (e.g., `public:users:123`)
105//! - **Labels**: `[{table_name}]` (e.g., `["users"]`)
106//! - **Properties**: All columns from the row (column names become property keys)
107//!
108//! ## WAL Message to SourceChange
109//!
110//! | WAL Operation | SourceChange |
111//! |---------------|--------------|
112//! | INSERT | `SourceChange::Insert { element: Node }` |
113//! | UPDATE | `SourceChange::Update { element: Node }` |
114//! | DELETE | `SourceChange::Delete { metadata }` |
115//!
116//! ## Example Mapping
117//!
118//! Given a PostgreSQL table:
119//!
120//! ```sql
121//! CREATE TABLE users (
122//!     id SERIAL PRIMARY KEY,
123//!     name VARCHAR(100),
124//!     email VARCHAR(255),
125//!     age INTEGER
126//! );
127//!
128//! INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
129//! ```
130//!
131//! Produces a SourceChange equivalent to:
132//!
133//! ```json
134//! {
135//!     "type": "Insert",
136//!     "element": {
137//!         "metadata": {
138//!             "element_id": "public:users:1",
139//!             "source_id": "pg-source",
140//!             "labels": ["users"],
141//!             "effective_from": 1699900000000000
142//!         },
143//!         "properties": {
144//!             "id": 1,
145//!             "name": "Alice",
146//!             "email": "alice@example.com",
147//!             "age": 30
148//!         }
149//!     }
150//! }
151//! ```
152//!
153//! # Usage Example
154//!
155//! ```rust,ignore
156//! use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
157//! use std::sync::Arc;
158//!
159//! let config = PostgresSourceBuilder::new()
160//!     .with_host("db.example.com")
161//!     .with_database("production")
162//!     .with_user("replication_user")
163//!     .with_password("secret")
164//!     .with_tables(vec!["users".to_string(), "orders".to_string()])
165//!     .build();
166//!
167//! let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
168//! drasi.add_source(source).await?;
169//! ```
170
171pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::Result;
183use async_trait::async_trait;
184use log::{error, info};
185use std::collections::HashMap;
186use std::sync::Arc;
187use tokio::sync::RwLock;
188
189use drasi_lib::channels::{DispatchMode, *};
190use drasi_lib::component_graph::ComponentStatusHandle;
191use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
192use drasi_lib::Source;
193use tracing::Instrument;
194
195/// PostgreSQL replication source that captures changes via logical replication.
196///
197/// This source connects to PostgreSQL using the replication protocol and decodes
198/// WAL messages in real-time, converting them to Drasi source change events.
199///
200/// # Fields
201///
202/// - `base`: Common source functionality (dispatchers, status, lifecycle)
203/// - `config`: PostgreSQL connection and replication configuration
204pub struct PostgresReplicationSource {
205    /// Base source implementation providing common functionality
206    base: SourceBase,
207    /// PostgreSQL source configuration
208    config: PostgresSourceConfig,
209}
210
211impl PostgresReplicationSource {
212    /// Create a builder for PostgresReplicationSource
213    ///
214    /// # Example
215    ///
216    /// ```rust,ignore
217    /// use drasi_source_postgres::PostgresReplicationSource;
218    ///
219    /// let source = PostgresReplicationSource::builder("pg-source")
220    ///     .with_host("db.example.com")
221    ///     .with_database("production")
222    ///     .with_user("replication_user")
223    ///     .with_password("secret")
224    ///     .with_tables(vec!["users".to_string(), "orders".to_string()])
225    ///     .with_bootstrap_provider(my_provider)
226    ///     .build()?;
227    /// ```
228    pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
229        PostgresSourceBuilder::new(id)
230    }
231
232    /// Create a new PostgreSQL replication source.
233    ///
234    /// The event channel is automatically injected when the source is added
235    /// to DrasiLib via `add_source()`.
236    ///
237    /// # Arguments
238    ///
239    /// * `id` - Unique identifier for this source instance
240    /// * `config` - PostgreSQL source configuration
241    ///
242    /// # Returns
243    ///
244    /// A new `PostgresReplicationSource` instance, or an error if construction fails.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the base source cannot be initialized.
249    ///
250    /// # Example
251    ///
252    /// ```rust,ignore
253    /// use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
254    ///
255    /// let config = PostgresSourceBuilder::new()
256    ///     .with_host("db.example.com")
257    ///     .with_database("mydb")
258    ///     .with_user("replication_user")
259    ///     .build();
260    ///
261    /// let source = PostgresReplicationSource::new("my-pg-source", config)?;
262    /// ```
263    pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
264        let id = id.into();
265        let params = SourceBaseParams::new(id);
266        Ok(Self {
267            base: SourceBase::new(params)?,
268            config,
269        })
270    }
271
272    /// Create a new PostgreSQL replication source with custom dispatch settings
273    ///
274    /// The event channel is automatically injected when the source is added
275    /// to DrasiLib via `add_source()`.
276    pub fn with_dispatch(
277        id: impl Into<String>,
278        config: PostgresSourceConfig,
279        dispatch_mode: Option<DispatchMode>,
280        dispatch_buffer_capacity: Option<usize>,
281    ) -> Result<Self> {
282        let id = id.into();
283        let mut params = SourceBaseParams::new(id);
284        if let Some(mode) = dispatch_mode {
285            params = params.with_dispatch_mode(mode);
286        }
287        if let Some(capacity) = dispatch_buffer_capacity {
288            params = params.with_dispatch_buffer_capacity(capacity);
289        }
290        Ok(Self {
291            base: SourceBase::new(params)?,
292            config,
293        })
294    }
295}
296
297#[async_trait]
298impl Source for PostgresReplicationSource {
299    fn id(&self) -> &str {
300        &self.base.id
301    }
302
303    fn type_name(&self) -> &str {
304        "postgres"
305    }
306
307    fn properties(&self) -> HashMap<String, serde_json::Value> {
308        match serde_json::to_value(&self.config) {
309            Ok(serde_json::Value::Object(mut map)) => {
310                map.remove("password");
311                map.into_iter().collect()
312            }
313            _ => HashMap::new(),
314        }
315    }
316
317    fn auto_start(&self) -> bool {
318        self.base.get_auto_start()
319    }
320
321    async fn start(&self) -> Result<()> {
322        if self.base.get_status().await == ComponentStatus::Running {
323            return Ok(());
324        }
325
326        self.base.set_status(ComponentStatus::Starting, None).await;
327        info!("Starting PostgreSQL replication source: {}", self.base.id);
328
329        let config = self.config.clone();
330        let source_id = self.base.id.clone();
331        let dispatchers = self.base.dispatchers.clone();
332        let reporter = self.base.status_handle();
333
334        // Get instance_id from context for log routing isolation
335        let instance_id = self
336            .base
337            .context()
338            .await
339            .map(|c| c.instance_id)
340            .unwrap_or_default();
341
342        // Create span for spawned task so log::info!, log::error! etc are routed
343        let source_id_for_span = source_id.clone();
344        let span = tracing::info_span!(
345            "postgres_replication_task",
346            instance_id = %instance_id,
347            component_id = %source_id_for_span,
348            component_type = "source"
349        );
350
351        let task = tokio::spawn(
352            async move {
353                if let Err(e) =
354                    run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
355                {
356                    error!("Replication task failed for {source_id}: {e}");
357                    reporter
358                        .set_status(
359                            ComponentStatus::Error,
360                            Some(format!("Replication failed: {e}")),
361                        )
362                        .await;
363                }
364            }
365            .instrument(span),
366        );
367
368        *self.base.task_handle.write().await = Some(task);
369        self.base
370            .set_status(
371                ComponentStatus::Running,
372                Some("PostgreSQL replication started".to_string()),
373            )
374            .await;
375
376        Ok(())
377    }
378
379    async fn stop(&self) -> Result<()> {
380        if self.base.get_status().await != ComponentStatus::Running {
381            return Ok(());
382        }
383
384        info!("Stopping PostgreSQL replication source: {}", self.base.id);
385
386        self.base.set_status(ComponentStatus::Stopping, None).await;
387
388        // Cancel the replication task
389        if let Some(task) = self.base.task_handle.write().await.take() {
390            task.abort();
391        }
392
393        self.base
394            .set_status(
395                ComponentStatus::Stopped,
396                Some("PostgreSQL replication stopped".to_string()),
397            )
398            .await;
399
400        Ok(())
401    }
402
403    async fn status(&self) -> ComponentStatus {
404        self.base.get_status().await
405    }
406
407    async fn subscribe(
408        &self,
409        settings: drasi_lib::config::SourceSubscriptionSettings,
410    ) -> Result<SubscriptionResponse> {
411        self.base
412            .subscribe_with_bootstrap(&settings, "PostgreSQL")
413            .await
414    }
415
416    fn as_any(&self) -> &dyn std::any::Any {
417        self
418    }
419
420    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
421        self.base.initialize(context).await;
422    }
423
424    async fn set_bootstrap_provider(
425        &self,
426        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
427    ) {
428        self.base.set_bootstrap_provider(provider).await;
429    }
430}
431
432async fn run_replication(
433    source_id: String,
434    config: PostgresSourceConfig,
435    dispatchers: Arc<
436        RwLock<
437            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
438        >,
439    >,
440    status_handle: ComponentStatusHandle,
441) -> Result<()> {
442    info!("Starting replication for source {source_id}");
443
444    let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
445
446    stream.run().await
447}
448
449/// Builder for PostgreSQL source configuration.
450///
451/// Provides a fluent API for constructing PostgreSQL source configurations
452/// with sensible defaults.
453///
454/// # Example
455///
456/// ```rust,ignore
457/// use drasi_source_postgres::PostgresReplicationSource;
458///
459/// let source = PostgresReplicationSource::builder("pg-source")
460///     .with_host("db.example.com")
461///     .with_database("production")
462///     .with_user("replication_user")
463///     .with_password("secret")
464///     .with_tables(vec!["users".to_string(), "orders".to_string()])
465///     .with_slot_name("my_slot")
466///     .build()?;
467/// ```
468pub struct PostgresSourceBuilder {
469    id: String,
470    host: String,
471    port: u16,
472    database: String,
473    user: String,
474    password: String,
475    tables: Vec<String>,
476    slot_name: String,
477    publication_name: String,
478    ssl_mode: SslMode,
479    table_keys: Vec<TableKeyConfig>,
480    dispatch_mode: Option<DispatchMode>,
481    dispatch_buffer_capacity: Option<usize>,
482    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
483    auto_start: bool,
484}
485
486impl PostgresSourceBuilder {
487    /// Create a new PostgreSQL source builder with the given ID and default values
488    pub fn new(id: impl Into<String>) -> Self {
489        Self {
490            id: id.into(),
491            host: "localhost".to_string(),
492            port: 5432,
493            database: String::new(),
494            user: String::new(),
495            password: String::new(),
496            tables: Vec::new(),
497            slot_name: "drasi_slot".to_string(),
498            publication_name: "drasi_publication".to_string(),
499            ssl_mode: SslMode::default(),
500            table_keys: Vec::new(),
501            dispatch_mode: None,
502            dispatch_buffer_capacity: None,
503            bootstrap_provider: None,
504            auto_start: true,
505        }
506    }
507
508    /// Set the PostgreSQL host
509    pub fn with_host(mut self, host: impl Into<String>) -> Self {
510        self.host = host.into();
511        self
512    }
513
514    /// Set the PostgreSQL port
515    pub fn with_port(mut self, port: u16) -> Self {
516        self.port = port;
517        self
518    }
519
520    /// Set the database name
521    pub fn with_database(mut self, database: impl Into<String>) -> Self {
522        self.database = database.into();
523        self
524    }
525
526    /// Set the database user
527    pub fn with_user(mut self, user: impl Into<String>) -> Self {
528        self.user = user.into();
529        self
530    }
531
532    /// Set the database password
533    pub fn with_password(mut self, password: impl Into<String>) -> Self {
534        self.password = password.into();
535        self
536    }
537
538    /// Set the tables to replicate
539    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
540        self.tables = tables;
541        self
542    }
543
544    /// Add a table to replicate
545    pub fn add_table(mut self, table: impl Into<String>) -> Self {
546        self.tables.push(table.into());
547        self
548    }
549
550    /// Set the replication slot name
551    pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
552        self.slot_name = slot_name.into();
553        self
554    }
555
556    /// Set the publication name
557    pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
558        self.publication_name = publication_name.into();
559        self
560    }
561
562    /// Set the SSL mode
563    pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
564        self.ssl_mode = ssl_mode;
565        self
566    }
567
568    /// Set the table key configurations
569    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
570        self.table_keys = table_keys;
571        self
572    }
573
574    /// Add a table key configuration
575    pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
576        self.table_keys.push(table_key);
577        self
578    }
579
580    /// Set the dispatch mode for this source
581    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
582        self.dispatch_mode = Some(mode);
583        self
584    }
585
586    /// Set the dispatch buffer capacity for this source
587    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
588        self.dispatch_buffer_capacity = Some(capacity);
589        self
590    }
591
592    /// Set the bootstrap provider for this source
593    pub fn with_bootstrap_provider(
594        mut self,
595        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
596    ) -> Self {
597        self.bootstrap_provider = Some(Box::new(provider));
598        self
599    }
600
601    /// Set whether this source should auto-start when DrasiLib starts.
602    ///
603    /// Default is `true`. Set to `false` if this source should only be
604    /// started manually via `start_source()`.
605    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
606        self.auto_start = auto_start;
607        self
608    }
609
610    /// Set the full configuration at once
611    pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
612        self.host = config.host;
613        self.port = config.port;
614        self.database = config.database;
615        self.user = config.user;
616        self.password = config.password;
617        self.tables = config.tables;
618        self.slot_name = config.slot_name;
619        self.publication_name = config.publication_name;
620        self.ssl_mode = config.ssl_mode;
621        self.table_keys = config.table_keys;
622        self
623    }
624
625    /// Build the PostgreSQL replication source
626    ///
627    /// # Errors
628    ///
629    /// Returns an error if the source cannot be constructed.
630    pub fn build(self) -> Result<PostgresReplicationSource> {
631        let config = PostgresSourceConfig {
632            host: self.host,
633            port: self.port,
634            database: self.database,
635            user: self.user,
636            password: self.password,
637            tables: self.tables,
638            slot_name: self.slot_name,
639            publication_name: self.publication_name,
640            ssl_mode: self.ssl_mode,
641            table_keys: self.table_keys,
642        };
643
644        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
645        if let Some(mode) = self.dispatch_mode {
646            params = params.with_dispatch_mode(mode);
647        }
648        if let Some(capacity) = self.dispatch_buffer_capacity {
649            params = params.with_dispatch_buffer_capacity(capacity);
650        }
651        if let Some(provider) = self.bootstrap_provider {
652            params = params.with_bootstrap_provider(provider);
653        }
654
655        Ok(PostgresReplicationSource {
656            base: SourceBase::new(params)?,
657            config,
658        })
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    mod construction {
667        use super::*;
668
669        #[test]
670        fn test_builder_with_valid_config() {
671            let source = PostgresSourceBuilder::new("test-source")
672                .with_database("testdb")
673                .with_user("testuser")
674                .build();
675            assert!(source.is_ok());
676        }
677
678        #[test]
679        fn test_builder_with_custom_config() {
680            let source = PostgresSourceBuilder::new("pg-source")
681                .with_host("192.168.1.100")
682                .with_port(5433)
683                .with_database("production")
684                .with_user("admin")
685                .with_password("secret")
686                .build()
687                .unwrap();
688            assert_eq!(source.id(), "pg-source");
689        }
690
691        #[test]
692        fn test_with_dispatch_creates_source() {
693            let config = PostgresSourceConfig {
694                host: "localhost".to_string(),
695                port: 5432,
696                database: "testdb".to_string(),
697                user: "testuser".to_string(),
698                password: String::new(),
699                tables: Vec::new(),
700                slot_name: "drasi_slot".to_string(),
701                publication_name: "drasi_publication".to_string(),
702                ssl_mode: SslMode::default(),
703                table_keys: Vec::new(),
704            };
705            let source = PostgresReplicationSource::with_dispatch(
706                "dispatch-source",
707                config,
708                Some(DispatchMode::Channel),
709                Some(2000),
710            );
711            assert!(source.is_ok());
712            assert_eq!(source.unwrap().id(), "dispatch-source");
713        }
714    }
715
716    mod properties {
717        use super::*;
718
719        #[test]
720        fn test_id_returns_correct_value() {
721            let source = PostgresSourceBuilder::new("my-pg-source")
722                .with_database("db")
723                .with_user("user")
724                .build()
725                .unwrap();
726            assert_eq!(source.id(), "my-pg-source");
727        }
728
729        #[test]
730        fn test_type_name_returns_postgres() {
731            let source = PostgresSourceBuilder::new("test")
732                .with_database("db")
733                .with_user("user")
734                .build()
735                .unwrap();
736            assert_eq!(source.type_name(), "postgres");
737        }
738
739        #[test]
740        fn test_properties_contains_connection_info() {
741            let source = PostgresSourceBuilder::new("test")
742                .with_host("db.example.com")
743                .with_port(5433)
744                .with_database("mydb")
745                .with_user("app_user")
746                .with_password("secret")
747                .with_tables(vec!["users".to_string()])
748                .build()
749                .unwrap();
750            let props = source.properties();
751
752            assert_eq!(
753                props.get("host"),
754                Some(&serde_json::Value::String("db.example.com".to_string()))
755            );
756            assert_eq!(
757                props.get("port"),
758                Some(&serde_json::Value::Number(5433.into()))
759            );
760            assert_eq!(
761                props.get("database"),
762                Some(&serde_json::Value::String("mydb".to_string()))
763            );
764            assert_eq!(
765                props.get("user"),
766                Some(&serde_json::Value::String("app_user".to_string()))
767            );
768        }
769
770        #[test]
771        fn test_properties_does_not_expose_password() {
772            let source = PostgresSourceBuilder::new("test")
773                .with_database("db")
774                .with_user("user")
775                .with_password("super_secret_password")
776                .build()
777                .unwrap();
778            let props = source.properties();
779
780            // Password should not be exposed in properties
781            assert!(!props.contains_key("password"));
782        }
783
784        #[test]
785        fn test_properties_includes_tables() {
786            let source = PostgresSourceBuilder::new("test")
787                .with_database("db")
788                .with_user("user")
789                .with_tables(vec!["users".to_string(), "orders".to_string()])
790                .build()
791                .unwrap();
792            let props = source.properties();
793
794            let tables = props.get("tables").unwrap().as_array().unwrap();
795            assert_eq!(tables.len(), 2);
796            assert_eq!(tables[0], "users");
797            assert_eq!(tables[1], "orders");
798        }
799    }
800
801    mod lifecycle {
802        use super::*;
803
804        #[tokio::test]
805        async fn test_initial_status_is_stopped() {
806            let source = PostgresSourceBuilder::new("test")
807                .with_database("db")
808                .with_user("user")
809                .build()
810                .unwrap();
811            assert_eq!(source.status().await, ComponentStatus::Stopped);
812        }
813    }
814
815    mod builder {
816        use super::*;
817
818        #[test]
819        fn test_postgres_builder_defaults() {
820            let source = PostgresSourceBuilder::new("test").build().unwrap();
821            assert_eq!(source.config.host, "localhost");
822            assert_eq!(source.config.port, 5432);
823            assert_eq!(source.config.slot_name, "drasi_slot");
824            assert_eq!(source.config.publication_name, "drasi_publication");
825        }
826
827        #[test]
828        fn test_postgres_builder_custom_values() {
829            let source = PostgresSourceBuilder::new("test")
830                .with_host("db.example.com")
831                .with_port(5433)
832                .with_database("production")
833                .with_user("app_user")
834                .with_password("secret")
835                .with_tables(vec!["users".to_string(), "orders".to_string()])
836                .build()
837                .unwrap();
838
839            assert_eq!(source.config.host, "db.example.com");
840            assert_eq!(source.config.port, 5433);
841            assert_eq!(source.config.database, "production");
842            assert_eq!(source.config.user, "app_user");
843            assert_eq!(source.config.password, "secret");
844            assert_eq!(source.config.tables.len(), 2);
845            assert_eq!(source.config.tables[0], "users");
846            assert_eq!(source.config.tables[1], "orders");
847        }
848
849        #[test]
850        fn test_builder_add_table() {
851            let source = PostgresSourceBuilder::new("test")
852                .add_table("table1")
853                .add_table("table2")
854                .add_table("table3")
855                .build()
856                .unwrap();
857
858            assert_eq!(source.config.tables.len(), 3);
859            assert_eq!(source.config.tables[0], "table1");
860            assert_eq!(source.config.tables[1], "table2");
861            assert_eq!(source.config.tables[2], "table3");
862        }
863
864        #[test]
865        fn test_builder_slot_and_publication() {
866            let source = PostgresSourceBuilder::new("test")
867                .with_slot_name("custom_slot")
868                .with_publication_name("custom_pub")
869                .build()
870                .unwrap();
871
872            assert_eq!(source.config.slot_name, "custom_slot");
873            assert_eq!(source.config.publication_name, "custom_pub");
874        }
875
876        #[test]
877        fn test_builder_id() {
878            let source = PostgresReplicationSource::builder("my-pg-source")
879                .with_database("db")
880                .with_user("user")
881                .build()
882                .unwrap();
883
884            assert_eq!(source.base.id, "my-pg-source");
885        }
886    }
887
888    mod config {
889        use super::*;
890
891        #[test]
892        fn test_config_serialization() {
893            let config = PostgresSourceConfig {
894                host: "localhost".to_string(),
895                port: 5432,
896                database: "testdb".to_string(),
897                user: "testuser".to_string(),
898                password: String::new(),
899                tables: Vec::new(),
900                slot_name: "drasi_slot".to_string(),
901                publication_name: "drasi_publication".to_string(),
902                ssl_mode: SslMode::default(),
903                table_keys: Vec::new(),
904            };
905
906            let json = serde_json::to_string(&config).unwrap();
907            let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
908
909            assert_eq!(config, deserialized);
910        }
911
912        #[test]
913        fn test_config_deserialization_with_required_fields() {
914            let json = r#"{
915                "database": "mydb",
916                "user": "myuser"
917            }"#;
918            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
919
920            assert_eq!(config.database, "mydb");
921            assert_eq!(config.user, "myuser");
922            assert_eq!(config.host, "localhost"); // default
923            assert_eq!(config.port, 5432); // default
924            assert_eq!(config.slot_name, "drasi_slot"); // default
925        }
926
927        #[test]
928        fn test_config_deserialization_full() {
929            let json = r#"{
930                "host": "db.prod.internal",
931                "port": 5433,
932                "database": "production",
933                "user": "replication_user",
934                "password": "secret",
935                "tables": ["accounts", "transactions"],
936                "slot_name": "prod_slot",
937                "publication_name": "prod_publication"
938            }"#;
939            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
940
941            assert_eq!(config.host, "db.prod.internal");
942            assert_eq!(config.port, 5433);
943            assert_eq!(config.database, "production");
944            assert_eq!(config.user, "replication_user");
945            assert_eq!(config.password, "secret");
946            assert_eq!(config.tables, vec!["accounts", "transactions"]);
947            assert_eq!(config.slot_name, "prod_slot");
948            assert_eq!(config.publication_name, "prod_publication");
949        }
950    }
951}
952
953/// Dynamic plugin entry point.
954///
955/// Dynamic plugin entry point.
956#[cfg(feature = "dynamic-plugin")]
957drasi_plugin_sdk::export_plugin!(
958    plugin_id = "postgres-source",
959    core_version = env!("CARGO_PKG_VERSION"),
960    lib_version = env!("CARGO_PKG_VERSION"),
961    plugin_version = env!("CARGO_PKG_VERSION"),
962    source_descriptors = [descriptor::PostgresSourceDescriptor],
963    reaction_descriptors = [],
964    bootstrap_descriptors = [],
965);