Skip to main content

drasi_source_postgres/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PostgreSQL Replication Source Plugin for Drasi
17//!
18//! This plugin captures data changes from PostgreSQL databases using logical replication.
19//! It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL)
20//! messages in real-time, converting them to Drasi source change events.
21//!
22//! # Prerequisites
23//!
24//! Before using this source, you must configure PostgreSQL for logical replication:
25//!
26//! 1. **Enable logical replication** in `postgresql.conf`:
27//!    ```text
28//!    wal_level = logical
29//!    max_replication_slots = 10
30//!    max_wal_senders = 10
31//!    ```
32//!
33//! 2. **Create a publication** for the tables you want to monitor:
34//!    ```sql
35//!    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
36//!    ```
37//!
38//! 3. **Create a replication slot** (optional - the source can create one automatically):
39//!    ```sql
40//!    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
41//!    ```
42//!
43//! 4. **Grant replication permissions** to the database user:
44//!    ```sql
45//!    ALTER ROLE drasi_user REPLICATION;
46//!    GRANT SELECT ON TABLE users, orders TO drasi_user;
47//!    ```
48//!
49//! # Architecture
50//!
51//! The source has two main components:
52//!
53//! - **Bootstrap Handler**: Performs an initial snapshot of table data when a query
54//!   subscribes with bootstrap enabled. Uses the replication slot's snapshot LSN to
55//!   ensure consistency.
56//!
57//! - **Streaming Handler**: Continuously reads WAL messages and decodes them using
58//!   the `pgoutput` protocol. Handles INSERT, UPDATE, and DELETE operations.
59//!
60//! # Configuration
61//!
62//! | Field | Type | Default | Description |
63//! |-------|------|---------|-------------|
64//! | `host` | string | `"localhost"` | PostgreSQL host |
65//! | `port` | u16 | `5432` | PostgreSQL port |
66//! | `database` | string | *required* | Database name |
67//! | `user` | string | *required* | Database user (must have replication permission) |
68//! | `password` | string | `""` | Database password |
69//! | `tables` | string[] | `[]` | Tables to replicate |
70//! | `slot_name` | string | `"drasi_slot"` | Replication slot name |
71//! | `publication_name` | string | `"drasi_publication"` | Publication name |
72//! | `ssl_mode` | string | `"prefer"` | SSL mode: disable, prefer, require |
73//! | `table_keys` | TableKeyConfig[] | `[]` | Primary key configuration for tables |
74//!
75//! # Example Configuration (YAML)
76//!
77//! ```yaml
78//! source_type: postgres
79//! properties:
80//!   host: db.example.com
81//!   port: 5432
82//!   database: production
83//!   user: replication_user
84//!   password: secret
85//!   tables:
86//!     - users
87//!     - orders
88//!   slot_name: drasi_slot
89//!   publication_name: drasi_publication
90//!   table_keys:
91//!     - table: users
92//!       key_columns: [id]
93//!     - table: orders
94//!       key_columns: [order_id]
95//! ```
96//!
97//! # Data Format
98//!
99//! The PostgreSQL source decodes WAL messages and converts them to Drasi source changes.
100//! Each row change is mapped as follows:
101//!
102//! ## Node Mapping
103//!
104//! - **Element ID**: `{schema}:{table}:{primary_key_value}` (e.g., `public:users:123`)
105//! - **Labels**: `[{table_name}]` (e.g., `["users"]`)
106//! - **Properties**: All columns from the row (column names become property keys)
107//!
108//! ## WAL Message to SourceChange
109//!
110//! | WAL Operation | SourceChange |
111//! |---------------|--------------|
112//! | INSERT | `SourceChange::Insert { element: Node }` |
113//! | UPDATE | `SourceChange::Update { element: Node }` |
114//! | DELETE | `SourceChange::Delete { metadata }` |
115//!
116//! ## Example Mapping
117//!
118//! Given a PostgreSQL table:
119//!
120//! ```sql
121//! CREATE TABLE users (
122//!     id SERIAL PRIMARY KEY,
123//!     name VARCHAR(100),
124//!     email VARCHAR(255),
125//!     age INTEGER
126//! );
127//!
128//! INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
129//! ```
130//!
131//! Produces a SourceChange equivalent to:
132//!
133//! ```json
134//! {
135//!     "type": "Insert",
136//!     "element": {
137//!         "metadata": {
138//!             "element_id": "public:users:1",
139//!             "source_id": "pg-source",
140//!             "labels": ["users"],
141//!             "effective_from": 1699900000000000
142//!         },
143//!         "properties": {
144//!             "id": 1,
145//!             "name": "Alice",
146//!             "email": "alice@example.com",
147//!             "age": 30
148//!         }
149//!     }
150//! }
151//! ```
152//!
153//! # Usage Example
154//!
155//! ```rust,ignore
156//! use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
157//! use std::sync::Arc;
158//!
159//! let config = PostgresSourceBuilder::new()
160//!     .with_host("db.example.com")
161//!     .with_database("production")
162//!     .with_user("replication_user")
163//!     .with_password("secret")
164//!     .with_tables(vec!["users".to_string(), "orders".to_string()])
165//!     .build();
166//!
167//! let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
168//! drasi.add_source(source).await?;
169//! ```
170
171pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::Result;
183use async_trait::async_trait;
184use log::{error, info};
185use std::collections::HashMap;
186use std::sync::Arc;
187use tokio::sync::RwLock;
188
189use drasi_lib::channels::{DispatchMode, *};
190use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
191use drasi_lib::Source;
192use tracing::Instrument;
193
194/// PostgreSQL replication source that captures changes via logical replication.
195///
196/// This source connects to PostgreSQL using the replication protocol and decodes
197/// WAL messages in real-time, converting them to Drasi source change events.
198///
199/// # Fields
200///
201/// - `base`: Common source functionality (dispatchers, status, lifecycle)
202/// - `config`: PostgreSQL connection and replication configuration
203pub struct PostgresReplicationSource {
204    /// Base source implementation providing common functionality
205    base: SourceBase,
206    /// PostgreSQL source configuration
207    config: PostgresSourceConfig,
208}
209
210impl PostgresReplicationSource {
211    /// Create a builder for PostgresReplicationSource
212    ///
213    /// # Example
214    ///
215    /// ```rust,ignore
216    /// use drasi_source_postgres::PostgresReplicationSource;
217    ///
218    /// let source = PostgresReplicationSource::builder("pg-source")
219    ///     .with_host("db.example.com")
220    ///     .with_database("production")
221    ///     .with_user("replication_user")
222    ///     .with_password("secret")
223    ///     .with_tables(vec!["users".to_string(), "orders".to_string()])
224    ///     .with_bootstrap_provider(my_provider)
225    ///     .build()?;
226    /// ```
227    pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
228        PostgresSourceBuilder::new(id)
229    }
230
231    /// Create a new PostgreSQL replication source.
232    ///
233    /// The event channel is automatically injected when the source is added
234    /// to DrasiLib via `add_source()`.
235    ///
236    /// # Arguments
237    ///
238    /// * `id` - Unique identifier for this source instance
239    /// * `config` - PostgreSQL source configuration
240    ///
241    /// # Returns
242    ///
243    /// A new `PostgresReplicationSource` instance, or an error if construction fails.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the base source cannot be initialized.
248    ///
249    /// # Example
250    ///
251    /// ```rust,ignore
252    /// use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
253    ///
254    /// let config = PostgresSourceBuilder::new()
255    ///     .with_host("db.example.com")
256    ///     .with_database("mydb")
257    ///     .with_user("replication_user")
258    ///     .build();
259    ///
260    /// let source = PostgresReplicationSource::new("my-pg-source", config)?;
261    /// ```
262    pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
263        let id = id.into();
264        let params = SourceBaseParams::new(id);
265        Ok(Self {
266            base: SourceBase::new(params)?,
267            config,
268        })
269    }
270
271    /// Create a new PostgreSQL replication source with custom dispatch settings
272    ///
273    /// The event channel is automatically injected when the source is added
274    /// to DrasiLib via `add_source()`.
275    pub fn with_dispatch(
276        id: impl Into<String>,
277        config: PostgresSourceConfig,
278        dispatch_mode: Option<DispatchMode>,
279        dispatch_buffer_capacity: Option<usize>,
280    ) -> Result<Self> {
281        let id = id.into();
282        let mut params = SourceBaseParams::new(id);
283        if let Some(mode) = dispatch_mode {
284            params = params.with_dispatch_mode(mode);
285        }
286        if let Some(capacity) = dispatch_buffer_capacity {
287            params = params.with_dispatch_buffer_capacity(capacity);
288        }
289        Ok(Self {
290            base: SourceBase::new(params)?,
291            config,
292        })
293    }
294}
295
296#[async_trait]
297impl Source for PostgresReplicationSource {
298    fn id(&self) -> &str {
299        &self.base.id
300    }
301
302    fn type_name(&self) -> &str {
303        "postgres"
304    }
305
306    fn properties(&self) -> HashMap<String, serde_json::Value> {
307        let mut props = HashMap::new();
308        props.insert(
309            "host".to_string(),
310            serde_json::Value::String(self.config.host.clone()),
311        );
312        props.insert(
313            "port".to_string(),
314            serde_json::Value::Number(self.config.port.into()),
315        );
316        props.insert(
317            "database".to_string(),
318            serde_json::Value::String(self.config.database.clone()),
319        );
320        props.insert(
321            "user".to_string(),
322            serde_json::Value::String(self.config.user.clone()),
323        );
324        // Don't expose password in properties
325        props.insert(
326            "tables".to_string(),
327            serde_json::Value::Array(
328                self.config
329                    .tables
330                    .iter()
331                    .map(|t| serde_json::Value::String(t.clone()))
332                    .collect(),
333            ),
334        );
335        props
336    }
337
338    fn auto_start(&self) -> bool {
339        self.base.get_auto_start()
340    }
341
342    async fn start(&self) -> Result<()> {
343        if self.base.get_status().await == ComponentStatus::Running {
344            return Ok(());
345        }
346
347        self.base.set_status(ComponentStatus::Starting).await;
348        info!("Starting PostgreSQL replication source: {}", self.base.id);
349
350        let config = self.config.clone();
351        let source_id = self.base.id.clone();
352        let dispatchers = self.base.dispatchers.clone();
353        let status_tx = self.base.status_tx();
354        let status_clone = self.base.status.clone();
355
356        // Get instance_id from context for log routing isolation
357        let instance_id = self
358            .base
359            .context()
360            .await
361            .map(|c| c.instance_id)
362            .unwrap_or_default();
363
364        // Create span for spawned task so log::info!, log::error! etc are routed
365        let source_id_for_span = source_id.clone();
366        let span = tracing::info_span!(
367            "postgres_replication_task",
368            instance_id = %instance_id,
369            component_id = %source_id_for_span,
370            component_type = "source"
371        );
372
373        let task = tokio::spawn(
374            async move {
375                if let Err(e) = run_replication(
376                    source_id.clone(),
377                    config,
378                    dispatchers,
379                    status_tx.clone(),
380                    status_clone.clone(),
381                )
382                .await
383                {
384                    error!("Replication task failed for {source_id}: {e}");
385                    *status_clone.write().await = ComponentStatus::Error;
386                    if let Some(ref tx) = *status_tx.read().await {
387                        let _ = tx
388                            .send(ComponentEvent {
389                                component_id: source_id,
390                                component_type: ComponentType::Source,
391                                status: ComponentStatus::Error,
392                                timestamp: chrono::Utc::now(),
393                                message: Some(format!("Replication failed: {e}")),
394                            })
395                            .await;
396                    }
397                }
398            }
399            .instrument(span),
400        );
401
402        *self.base.task_handle.write().await = Some(task);
403        self.base.set_status(ComponentStatus::Running).await;
404
405        self.base
406            .send_component_event(
407                ComponentStatus::Running,
408                Some("PostgreSQL replication started".to_string()),
409            )
410            .await?;
411
412        Ok(())
413    }
414
415    async fn stop(&self) -> Result<()> {
416        if self.base.get_status().await != ComponentStatus::Running {
417            return Ok(());
418        }
419
420        info!("Stopping PostgreSQL replication source: {}", self.base.id);
421
422        self.base.set_status(ComponentStatus::Stopping).await;
423
424        // Cancel the replication task
425        if let Some(task) = self.base.task_handle.write().await.take() {
426            task.abort();
427        }
428
429        self.base.set_status(ComponentStatus::Stopped).await;
430        self.base
431            .send_component_event(
432                ComponentStatus::Stopped,
433                Some("PostgreSQL replication stopped".to_string()),
434            )
435            .await?;
436
437        Ok(())
438    }
439
440    async fn status(&self) -> ComponentStatus {
441        self.base.get_status().await
442    }
443
444    async fn subscribe(
445        &self,
446        settings: drasi_lib::config::SourceSubscriptionSettings,
447    ) -> Result<SubscriptionResponse> {
448        self.base
449            .subscribe_with_bootstrap(&settings, "PostgreSQL")
450            .await
451    }
452
453    fn as_any(&self) -> &dyn std::any::Any {
454        self
455    }
456
457    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
458        self.base.initialize(context).await;
459    }
460
461    async fn set_bootstrap_provider(
462        &self,
463        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
464    ) {
465        self.base.set_bootstrap_provider(provider).await;
466    }
467}
468
469async fn run_replication(
470    source_id: String,
471    config: PostgresSourceConfig,
472    dispatchers: Arc<
473        RwLock<
474            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
475        >,
476    >,
477    status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
478    status: Arc<RwLock<ComponentStatus>>,
479) -> Result<()> {
480    info!("Starting replication for source {source_id}");
481
482    let mut stream =
483        stream::ReplicationStream::new(config, source_id, dispatchers, status_tx, status);
484
485    stream.run().await
486}
487
488/// Builder for PostgreSQL source configuration.
489///
490/// Provides a fluent API for constructing PostgreSQL source configurations
491/// with sensible defaults.
492///
493/// # Example
494///
495/// ```rust,ignore
496/// use drasi_source_postgres::PostgresReplicationSource;
497///
498/// let source = PostgresReplicationSource::builder("pg-source")
499///     .with_host("db.example.com")
500///     .with_database("production")
501///     .with_user("replication_user")
502///     .with_password("secret")
503///     .with_tables(vec!["users".to_string(), "orders".to_string()])
504///     .with_slot_name("my_slot")
505///     .build()?;
506/// ```
507pub struct PostgresSourceBuilder {
508    id: String,
509    host: String,
510    port: u16,
511    database: String,
512    user: String,
513    password: String,
514    tables: Vec<String>,
515    slot_name: String,
516    publication_name: String,
517    ssl_mode: SslMode,
518    table_keys: Vec<TableKeyConfig>,
519    dispatch_mode: Option<DispatchMode>,
520    dispatch_buffer_capacity: Option<usize>,
521    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
522    auto_start: bool,
523}
524
525impl PostgresSourceBuilder {
526    /// Create a new PostgreSQL source builder with the given ID and default values
527    pub fn new(id: impl Into<String>) -> Self {
528        Self {
529            id: id.into(),
530            host: "localhost".to_string(),
531            port: 5432,
532            database: String::new(),
533            user: String::new(),
534            password: String::new(),
535            tables: Vec::new(),
536            slot_name: "drasi_slot".to_string(),
537            publication_name: "drasi_publication".to_string(),
538            ssl_mode: SslMode::default(),
539            table_keys: Vec::new(),
540            dispatch_mode: None,
541            dispatch_buffer_capacity: None,
542            bootstrap_provider: None,
543            auto_start: true,
544        }
545    }
546
547    /// Set the PostgreSQL host
548    pub fn with_host(mut self, host: impl Into<String>) -> Self {
549        self.host = host.into();
550        self
551    }
552
553    /// Set the PostgreSQL port
554    pub fn with_port(mut self, port: u16) -> Self {
555        self.port = port;
556        self
557    }
558
559    /// Set the database name
560    pub fn with_database(mut self, database: impl Into<String>) -> Self {
561        self.database = database.into();
562        self
563    }
564
565    /// Set the database user
566    pub fn with_user(mut self, user: impl Into<String>) -> Self {
567        self.user = user.into();
568        self
569    }
570
571    /// Set the database password
572    pub fn with_password(mut self, password: impl Into<String>) -> Self {
573        self.password = password.into();
574        self
575    }
576
577    /// Set the tables to replicate
578    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
579        self.tables = tables;
580        self
581    }
582
583    /// Add a table to replicate
584    pub fn add_table(mut self, table: impl Into<String>) -> Self {
585        self.tables.push(table.into());
586        self
587    }
588
589    /// Set the replication slot name
590    pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
591        self.slot_name = slot_name.into();
592        self
593    }
594
595    /// Set the publication name
596    pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
597        self.publication_name = publication_name.into();
598        self
599    }
600
601    /// Set the SSL mode
602    pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
603        self.ssl_mode = ssl_mode;
604        self
605    }
606
607    /// Set the table key configurations
608    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
609        self.table_keys = table_keys;
610        self
611    }
612
613    /// Add a table key configuration
614    pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
615        self.table_keys.push(table_key);
616        self
617    }
618
619    /// Set the dispatch mode for this source
620    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
621        self.dispatch_mode = Some(mode);
622        self
623    }
624
625    /// Set the dispatch buffer capacity for this source
626    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
627        self.dispatch_buffer_capacity = Some(capacity);
628        self
629    }
630
631    /// Set the bootstrap provider for this source
632    pub fn with_bootstrap_provider(
633        mut self,
634        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
635    ) -> Self {
636        self.bootstrap_provider = Some(Box::new(provider));
637        self
638    }
639
640    /// Set whether this source should auto-start when DrasiLib starts.
641    ///
642    /// Default is `true`. Set to `false` if this source should only be
643    /// started manually via `start_source()`.
644    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
645        self.auto_start = auto_start;
646        self
647    }
648
649    /// Set the full configuration at once
650    pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
651        self.host = config.host;
652        self.port = config.port;
653        self.database = config.database;
654        self.user = config.user;
655        self.password = config.password;
656        self.tables = config.tables;
657        self.slot_name = config.slot_name;
658        self.publication_name = config.publication_name;
659        self.ssl_mode = config.ssl_mode;
660        self.table_keys = config.table_keys;
661        self
662    }
663
664    /// Build the PostgreSQL replication source
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the source cannot be constructed.
669    pub fn build(self) -> Result<PostgresReplicationSource> {
670        let config = PostgresSourceConfig {
671            host: self.host,
672            port: self.port,
673            database: self.database,
674            user: self.user,
675            password: self.password,
676            tables: self.tables,
677            slot_name: self.slot_name,
678            publication_name: self.publication_name,
679            ssl_mode: self.ssl_mode,
680            table_keys: self.table_keys,
681        };
682
683        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
684        if let Some(mode) = self.dispatch_mode {
685            params = params.with_dispatch_mode(mode);
686        }
687        if let Some(capacity) = self.dispatch_buffer_capacity {
688            params = params.with_dispatch_buffer_capacity(capacity);
689        }
690        if let Some(provider) = self.bootstrap_provider {
691            params = params.with_bootstrap_provider(provider);
692        }
693
694        Ok(PostgresReplicationSource {
695            base: SourceBase::new(params)?,
696            config,
697        })
698    }
699}
700
701#[cfg(test)]
702mod tests {
703    use super::*;
704
705    mod construction {
706        use super::*;
707
708        #[test]
709        fn test_builder_with_valid_config() {
710            let source = PostgresSourceBuilder::new("test-source")
711                .with_database("testdb")
712                .with_user("testuser")
713                .build();
714            assert!(source.is_ok());
715        }
716
717        #[test]
718        fn test_builder_with_custom_config() {
719            let source = PostgresSourceBuilder::new("pg-source")
720                .with_host("192.168.1.100")
721                .with_port(5433)
722                .with_database("production")
723                .with_user("admin")
724                .with_password("secret")
725                .build()
726                .unwrap();
727            assert_eq!(source.id(), "pg-source");
728        }
729
730        #[test]
731        fn test_with_dispatch_creates_source() {
732            let config = PostgresSourceConfig {
733                host: "localhost".to_string(),
734                port: 5432,
735                database: "testdb".to_string(),
736                user: "testuser".to_string(),
737                password: String::new(),
738                tables: Vec::new(),
739                slot_name: "drasi_slot".to_string(),
740                publication_name: "drasi_publication".to_string(),
741                ssl_mode: SslMode::default(),
742                table_keys: Vec::new(),
743            };
744            let source = PostgresReplicationSource::with_dispatch(
745                "dispatch-source",
746                config,
747                Some(DispatchMode::Channel),
748                Some(2000),
749            );
750            assert!(source.is_ok());
751            assert_eq!(source.unwrap().id(), "dispatch-source");
752        }
753    }
754
755    mod properties {
756        use super::*;
757
758        #[test]
759        fn test_id_returns_correct_value() {
760            let source = PostgresSourceBuilder::new("my-pg-source")
761                .with_database("db")
762                .with_user("user")
763                .build()
764                .unwrap();
765            assert_eq!(source.id(), "my-pg-source");
766        }
767
768        #[test]
769        fn test_type_name_returns_postgres() {
770            let source = PostgresSourceBuilder::new("test")
771                .with_database("db")
772                .with_user("user")
773                .build()
774                .unwrap();
775            assert_eq!(source.type_name(), "postgres");
776        }
777
778        #[test]
779        fn test_properties_contains_connection_info() {
780            let source = PostgresSourceBuilder::new("test")
781                .with_host("db.example.com")
782                .with_port(5433)
783                .with_database("mydb")
784                .with_user("app_user")
785                .with_password("secret")
786                .with_tables(vec!["users".to_string()])
787                .build()
788                .unwrap();
789            let props = source.properties();
790
791            assert_eq!(
792                props.get("host"),
793                Some(&serde_json::Value::String("db.example.com".to_string()))
794            );
795            assert_eq!(
796                props.get("port"),
797                Some(&serde_json::Value::Number(5433.into()))
798            );
799            assert_eq!(
800                props.get("database"),
801                Some(&serde_json::Value::String("mydb".to_string()))
802            );
803            assert_eq!(
804                props.get("user"),
805                Some(&serde_json::Value::String("app_user".to_string()))
806            );
807        }
808
809        #[test]
810        fn test_properties_does_not_expose_password() {
811            let source = PostgresSourceBuilder::new("test")
812                .with_database("db")
813                .with_user("user")
814                .with_password("super_secret_password")
815                .build()
816                .unwrap();
817            let props = source.properties();
818
819            // Password should not be exposed in properties
820            assert!(!props.contains_key("password"));
821        }
822
823        #[test]
824        fn test_properties_includes_tables() {
825            let source = PostgresSourceBuilder::new("test")
826                .with_database("db")
827                .with_user("user")
828                .with_tables(vec!["users".to_string(), "orders".to_string()])
829                .build()
830                .unwrap();
831            let props = source.properties();
832
833            let tables = props.get("tables").unwrap().as_array().unwrap();
834            assert_eq!(tables.len(), 2);
835            assert_eq!(tables[0], "users");
836            assert_eq!(tables[1], "orders");
837        }
838    }
839
840    mod lifecycle {
841        use super::*;
842
843        #[tokio::test]
844        async fn test_initial_status_is_stopped() {
845            let source = PostgresSourceBuilder::new("test")
846                .with_database("db")
847                .with_user("user")
848                .build()
849                .unwrap();
850            assert_eq!(source.status().await, ComponentStatus::Stopped);
851        }
852    }
853
854    mod builder {
855        use super::*;
856
857        #[test]
858        fn test_postgres_builder_defaults() {
859            let source = PostgresSourceBuilder::new("test").build().unwrap();
860            assert_eq!(source.config.host, "localhost");
861            assert_eq!(source.config.port, 5432);
862            assert_eq!(source.config.slot_name, "drasi_slot");
863            assert_eq!(source.config.publication_name, "drasi_publication");
864        }
865
866        #[test]
867        fn test_postgres_builder_custom_values() {
868            let source = PostgresSourceBuilder::new("test")
869                .with_host("db.example.com")
870                .with_port(5433)
871                .with_database("production")
872                .with_user("app_user")
873                .with_password("secret")
874                .with_tables(vec!["users".to_string(), "orders".to_string()])
875                .build()
876                .unwrap();
877
878            assert_eq!(source.config.host, "db.example.com");
879            assert_eq!(source.config.port, 5433);
880            assert_eq!(source.config.database, "production");
881            assert_eq!(source.config.user, "app_user");
882            assert_eq!(source.config.password, "secret");
883            assert_eq!(source.config.tables.len(), 2);
884            assert_eq!(source.config.tables[0], "users");
885            assert_eq!(source.config.tables[1], "orders");
886        }
887
888        #[test]
889        fn test_builder_add_table() {
890            let source = PostgresSourceBuilder::new("test")
891                .add_table("table1")
892                .add_table("table2")
893                .add_table("table3")
894                .build()
895                .unwrap();
896
897            assert_eq!(source.config.tables.len(), 3);
898            assert_eq!(source.config.tables[0], "table1");
899            assert_eq!(source.config.tables[1], "table2");
900            assert_eq!(source.config.tables[2], "table3");
901        }
902
903        #[test]
904        fn test_builder_slot_and_publication() {
905            let source = PostgresSourceBuilder::new("test")
906                .with_slot_name("custom_slot")
907                .with_publication_name("custom_pub")
908                .build()
909                .unwrap();
910
911            assert_eq!(source.config.slot_name, "custom_slot");
912            assert_eq!(source.config.publication_name, "custom_pub");
913        }
914
915        #[test]
916        fn test_builder_id() {
917            let source = PostgresReplicationSource::builder("my-pg-source")
918                .with_database("db")
919                .with_user("user")
920                .build()
921                .unwrap();
922
923            assert_eq!(source.base.id, "my-pg-source");
924        }
925    }
926
927    mod config {
928        use super::*;
929
930        #[test]
931        fn test_config_serialization() {
932            let config = PostgresSourceConfig {
933                host: "localhost".to_string(),
934                port: 5432,
935                database: "testdb".to_string(),
936                user: "testuser".to_string(),
937                password: String::new(),
938                tables: Vec::new(),
939                slot_name: "drasi_slot".to_string(),
940                publication_name: "drasi_publication".to_string(),
941                ssl_mode: SslMode::default(),
942                table_keys: Vec::new(),
943            };
944
945            let json = serde_json::to_string(&config).unwrap();
946            let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
947
948            assert_eq!(config, deserialized);
949        }
950
951        #[test]
952        fn test_config_deserialization_with_required_fields() {
953            let json = r#"{
954                "database": "mydb",
955                "user": "myuser"
956            }"#;
957            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
958
959            assert_eq!(config.database, "mydb");
960            assert_eq!(config.user, "myuser");
961            assert_eq!(config.host, "localhost"); // default
962            assert_eq!(config.port, 5432); // default
963            assert_eq!(config.slot_name, "drasi_slot"); // default
964        }
965
966        #[test]
967        fn test_config_deserialization_full() {
968            let json = r#"{
969                "host": "db.prod.internal",
970                "port": 5433,
971                "database": "production",
972                "user": "replication_user",
973                "password": "secret",
974                "tables": ["accounts", "transactions"],
975                "slot_name": "prod_slot",
976                "publication_name": "prod_publication"
977            }"#;
978            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
979
980            assert_eq!(config.host, "db.prod.internal");
981            assert_eq!(config.port, 5433);
982            assert_eq!(config.database, "production");
983            assert_eq!(config.user, "replication_user");
984            assert_eq!(config.password, "secret");
985            assert_eq!(config.tables, vec!["accounts", "transactions"]);
986            assert_eq!(config.slot_name, "prod_slot");
987            assert_eq!(config.publication_name, "prod_publication");
988        }
989    }
990}
991
992/// Dynamic plugin entry point.
993///
994/// Dynamic plugin entry point.
995#[cfg(feature = "dynamic-plugin")]
996drasi_plugin_sdk::export_plugin!(
997    plugin_id = "postgres-source",
998    core_version = env!("CARGO_PKG_VERSION"),
999    lib_version = env!("CARGO_PKG_VERSION"),
1000    plugin_version = env!("CARGO_PKG_VERSION"),
1001    source_descriptors = [descriptor::PostgresSourceDescriptor],
1002    reaction_descriptors = [],
1003    bootstrap_descriptors = [],
1004);