Skip to main content

drasi_source_postgres/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PostgreSQL Replication Source Plugin for Drasi
17//!
18//! This plugin captures data changes from PostgreSQL databases using logical replication.
19//! It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL)
20//! messages in real-time, converting them to Drasi source change events.
21//!
22//! # Prerequisites
23//!
24//! Before using this source, you must configure PostgreSQL for logical replication:
25//!
26//! 1. **Enable logical replication** in `postgresql.conf`:
27//!    ```text
28//!    wal_level = logical
29//!    max_replication_slots = 10
30//!    max_wal_senders = 10
31//!    ```
32//!
33//! 2. **Create a publication** for the tables you want to monitor:
34//!    ```sql
35//!    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
36//!    ```
37//!
38//! 3. **Create a replication slot** (optional - the source can create one automatically):
39//!    ```sql
40//!    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
41//!    ```
42//!
43//! 4. **Grant replication permissions** to the database user:
44//!    ```sql
45//!    ALTER ROLE drasi_user REPLICATION;
46//!    GRANT SELECT ON TABLE users, orders TO drasi_user;
47//!    ```
48//!
49//! # Architecture
50//!
51//! The source has two main components:
52//!
53//! - **Bootstrap Handler**: Performs an initial snapshot of table data when a query
54//!   subscribes with bootstrap enabled. Uses the replication slot's snapshot LSN to
55//!   ensure consistency.
56//!
57//! - **Streaming Handler**: Continuously reads WAL messages and decodes them using
58//!   the `pgoutput` protocol. Handles INSERT, UPDATE, and DELETE operations.
59//!
60//! # Configuration
61//!
62//! | Field | Type | Default | Description |
63//! |-------|------|---------|-------------|
64//! | `host` | string | `"localhost"` | PostgreSQL host |
65//! | `port` | u16 | `5432` | PostgreSQL port |
66//! | `database` | string | *required* | Database name |
67//! | `user` | string | *required* | Database user (must have replication permission) |
68//! | `password` | string | `""` | Database password |
69//! | `tables` | string[] | `[]` | Tables to replicate |
70//! | `slot_name` | string | `"drasi_slot"` | Replication slot name |
71//! | `publication_name` | string | `"drasi_publication"` | Publication name |
72//! | `ssl_mode` | string | `"prefer"` | SSL mode: disable, prefer, require |
73//! | `table_keys` | TableKeyConfig[] | `[]` | Primary key configuration for tables |
74//!
75//! # Example Configuration (YAML)
76//!
77//! ```yaml
78//! source_type: postgres
79//! properties:
80//!   host: db.example.com
81//!   port: 5432
82//!   database: production
83//!   user: replication_user
84//!   password: secret
85//!   tables:
86//!     - users
87//!     - orders
88//!   slot_name: drasi_slot
89//!   publication_name: drasi_publication
90//!   table_keys:
91//!     - table: users
92//!       key_columns: [id]
93//!     - table: orders
94//!       key_columns: [order_id]
95//! ```
96//!
97//! # Data Format
98//!
99//! The PostgreSQL source decodes WAL messages and converts them to Drasi source changes.
100//! Each row change is mapped as follows:
101//!
102//! ## Node Mapping
103//!
104//! - **Element ID**: `{schema}:{table}:{primary_key_value}` (e.g., `public:users:123`)
105//! - **Labels**: `[{table_name}]` (e.g., `["users"]`)
106//! - **Properties**: All columns from the row (column names become property keys)
107//!
108//! ## WAL Message to SourceChange
109//!
110//! | WAL Operation | SourceChange |
111//! |---------------|--------------|
112//! | INSERT | `SourceChange::Insert { element: Node }` |
113//! | UPDATE | `SourceChange::Update { element: Node }` |
114//! | DELETE | `SourceChange::Delete { metadata }` |
115//!
116//! ## Example Mapping
117//!
118//! Given a PostgreSQL table:
119//!
120//! ```sql
121//! CREATE TABLE users (
122//!     id SERIAL PRIMARY KEY,
123//!     name VARCHAR(100),
124//!     email VARCHAR(255),
125//!     age INTEGER
126//! );
127//!
128//! INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
129//! ```
130//!
131//! Produces a SourceChange equivalent to:
132//!
133//! ```json
134//! {
135//!     "type": "Insert",
136//!     "element": {
137//!         "metadata": {
138//!             "element_id": "public:users:1",
139//!             "source_id": "pg-source",
140//!             "labels": ["users"],
141//!             "effective_from": 1699900000000000
142//!         },
143//!         "properties": {
144//!             "id": 1,
145//!             "name": "Alice",
146//!             "email": "alice@example.com",
147//!             "age": 30
148//!         }
149//!     }
150//! }
151//! ```
152//!
153//! # Usage Example
154//!
155//! ```rust,ignore
156//! use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
157//! use std::sync::Arc;
158//!
159//! let config = PostgresSourceBuilder::new()
160//!     .with_host("db.example.com")
161//!     .with_database("production")
162//!     .with_user("replication_user")
163//!     .with_password("secret")
164//!     .with_tables(vec!["users".to_string(), "orders".to_string()])
165//!     .build();
166//!
167//! let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
168//! drasi.add_source(source).await?;
169//! ```
170
171pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::Result;
183use async_trait::async_trait;
184use log::{error, info};
185use std::collections::HashMap;
186use std::sync::Arc;
187use tokio::sync::RwLock;
188
189use drasi_lib::channels::{DispatchMode, *};
190use drasi_lib::component_graph::ComponentStatusHandle;
191use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
192use drasi_lib::Source;
193use tracing::Instrument;
194
195/// PostgreSQL replication source that captures changes via logical replication.
196///
197/// This source connects to PostgreSQL using the replication protocol and decodes
198/// WAL messages in real-time, converting them to Drasi source change events.
199///
200/// # Fields
201///
202/// - `base`: Common source functionality (dispatchers, status, lifecycle)
203/// - `config`: PostgreSQL connection and replication configuration
204pub struct PostgresReplicationSource {
205    /// Base source implementation providing common functionality
206    base: SourceBase,
207    /// PostgreSQL source configuration
208    config: PostgresSourceConfig,
209}
210
211impl PostgresReplicationSource {
212    /// Create a builder for PostgresReplicationSource
213    ///
214    /// # Example
215    ///
216    /// ```rust,ignore
217    /// use drasi_source_postgres::PostgresReplicationSource;
218    ///
219    /// let source = PostgresReplicationSource::builder("pg-source")
220    ///     .with_host("db.example.com")
221    ///     .with_database("production")
222    ///     .with_user("replication_user")
223    ///     .with_password("secret")
224    ///     .with_tables(vec!["users".to_string(), "orders".to_string()])
225    ///     .with_bootstrap_provider(my_provider)
226    ///     .build()?;
227    /// ```
228    pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
229        PostgresSourceBuilder::new(id)
230    }
231
232    /// Create a new PostgreSQL replication source.
233    ///
234    /// The event channel is automatically injected when the source is added
235    /// to DrasiLib via `add_source()`.
236    ///
237    /// # Arguments
238    ///
239    /// * `id` - Unique identifier for this source instance
240    /// * `config` - PostgreSQL source configuration
241    ///
242    /// # Returns
243    ///
244    /// A new `PostgresReplicationSource` instance, or an error if construction fails.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the base source cannot be initialized.
249    ///
250    /// # Example
251    ///
252    /// ```rust,ignore
253    /// use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
254    ///
255    /// let config = PostgresSourceBuilder::new()
256    ///     .with_host("db.example.com")
257    ///     .with_database("mydb")
258    ///     .with_user("replication_user")
259    ///     .build();
260    ///
261    /// let source = PostgresReplicationSource::new("my-pg-source", config)?;
262    /// ```
263    pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
264        let id = id.into();
265        let params = SourceBaseParams::new(id);
266        Ok(Self {
267            base: SourceBase::new(params)?,
268            config,
269        })
270    }
271
272    /// Create a new PostgreSQL replication source with custom dispatch settings
273    ///
274    /// The event channel is automatically injected when the source is added
275    /// to DrasiLib via `add_source()`.
276    pub fn with_dispatch(
277        id: impl Into<String>,
278        config: PostgresSourceConfig,
279        dispatch_mode: Option<DispatchMode>,
280        dispatch_buffer_capacity: Option<usize>,
281    ) -> Result<Self> {
282        let id = id.into();
283        let mut params = SourceBaseParams::new(id);
284        if let Some(mode) = dispatch_mode {
285            params = params.with_dispatch_mode(mode);
286        }
287        if let Some(capacity) = dispatch_buffer_capacity {
288            params = params.with_dispatch_buffer_capacity(capacity);
289        }
290        Ok(Self {
291            base: SourceBase::new(params)?,
292            config,
293        })
294    }
295}
296
297#[async_trait]
298impl Source for PostgresReplicationSource {
299    fn id(&self) -> &str {
300        &self.base.id
301    }
302
303    fn type_name(&self) -> &str {
304        "postgres"
305    }
306
307    fn properties(&self) -> HashMap<String, serde_json::Value> {
308        use crate::descriptor::PostgresSourceConfigDto;
309
310        self.base
311            .properties_or_serialize(&PostgresSourceConfigDto::from(&self.config))
312    }
313
314    fn auto_start(&self) -> bool {
315        self.base.get_auto_start()
316    }
317
318    async fn start(&self) -> Result<()> {
319        if self.base.get_status().await == ComponentStatus::Running {
320            return Ok(());
321        }
322
323        self.base.set_status(ComponentStatus::Starting, None).await;
324        info!("Starting PostgreSQL replication source: {}", self.base.id);
325
326        let config = self.config.clone();
327        let source_id = self.base.id.clone();
328        let dispatchers = self.base.dispatchers.clone();
329        let reporter = self.base.status_handle();
330
331        // Get instance_id from context for log routing isolation
332        let instance_id = self
333            .base
334            .context()
335            .await
336            .map(|c| c.instance_id)
337            .unwrap_or_default();
338
339        // Create span for spawned task so log::info!, log::error! etc are routed
340        let source_id_for_span = source_id.clone();
341        let span = tracing::info_span!(
342            "postgres_replication_task",
343            instance_id = %instance_id,
344            component_id = %source_id_for_span,
345            component_type = "source"
346        );
347
348        let task = tokio::spawn(
349            async move {
350                if let Err(e) =
351                    run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
352                {
353                    error!("Replication task failed for {source_id}: {e}");
354                    reporter
355                        .set_status(
356                            ComponentStatus::Error,
357                            Some(format!("Replication failed: {e}")),
358                        )
359                        .await;
360                }
361            }
362            .instrument(span),
363        );
364
365        *self.base.task_handle.write().await = Some(task);
366        self.base
367            .set_status(
368                ComponentStatus::Running,
369                Some("PostgreSQL replication started".to_string()),
370            )
371            .await;
372
373        Ok(())
374    }
375
376    async fn stop(&self) -> Result<()> {
377        if self.base.get_status().await != ComponentStatus::Running {
378            return Ok(());
379        }
380
381        info!("Stopping PostgreSQL replication source: {}", self.base.id);
382
383        self.base.set_status(ComponentStatus::Stopping, None).await;
384
385        // Cancel the replication task
386        if let Some(task) = self.base.task_handle.write().await.take() {
387            task.abort();
388        }
389
390        self.base
391            .set_status(
392                ComponentStatus::Stopped,
393                Some("PostgreSQL replication stopped".to_string()),
394            )
395            .await;
396
397        Ok(())
398    }
399
400    async fn status(&self) -> ComponentStatus {
401        self.base.get_status().await
402    }
403
404    async fn subscribe(
405        &self,
406        settings: drasi_lib::config::SourceSubscriptionSettings,
407    ) -> Result<SubscriptionResponse> {
408        self.base
409            .subscribe_with_bootstrap(&settings, "PostgreSQL")
410            .await
411    }
412
413    fn as_any(&self) -> &dyn std::any::Any {
414        self
415    }
416
417    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
418        self.base.initialize(context).await;
419    }
420
421    async fn set_bootstrap_provider(
422        &self,
423        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
424    ) {
425        self.base.set_bootstrap_provider(provider).await;
426    }
427}
428
429async fn run_replication(
430    source_id: String,
431    config: PostgresSourceConfig,
432    dispatchers: Arc<
433        RwLock<
434            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
435        >,
436    >,
437    status_handle: ComponentStatusHandle,
438) -> Result<()> {
439    info!("Starting replication for source {source_id}");
440
441    let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
442
443    stream.run().await
444}
445
446/// Builder for PostgreSQL source configuration.
447///
448/// Provides a fluent API for constructing PostgreSQL source configurations
449/// with sensible defaults.
450///
451/// # Example
452///
453/// ```rust,ignore
454/// use drasi_source_postgres::PostgresReplicationSource;
455///
456/// let source = PostgresReplicationSource::builder("pg-source")
457///     .with_host("db.example.com")
458///     .with_database("production")
459///     .with_user("replication_user")
460///     .with_password("secret")
461///     .with_tables(vec!["users".to_string(), "orders".to_string()])
462///     .with_slot_name("my_slot")
463///     .build()?;
464/// ```
465pub struct PostgresSourceBuilder {
466    id: String,
467    host: String,
468    port: u16,
469    database: String,
470    user: String,
471    password: String,
472    tables: Vec<String>,
473    slot_name: String,
474    publication_name: String,
475    ssl_mode: SslMode,
476    table_keys: Vec<TableKeyConfig>,
477    dispatch_mode: Option<DispatchMode>,
478    dispatch_buffer_capacity: Option<usize>,
479    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
480    auto_start: bool,
481}
482
483impl PostgresSourceBuilder {
484    /// Create a new PostgreSQL source builder with the given ID and default values
485    pub fn new(id: impl Into<String>) -> Self {
486        Self {
487            id: id.into(),
488            host: "localhost".to_string(),
489            port: 5432,
490            database: String::new(),
491            user: String::new(),
492            password: String::new(),
493            tables: Vec::new(),
494            slot_name: "drasi_slot".to_string(),
495            publication_name: "drasi_publication".to_string(),
496            ssl_mode: SslMode::default(),
497            table_keys: Vec::new(),
498            dispatch_mode: None,
499            dispatch_buffer_capacity: None,
500            bootstrap_provider: None,
501            auto_start: true,
502        }
503    }
504
505    /// Set the PostgreSQL host
506    pub fn with_host(mut self, host: impl Into<String>) -> Self {
507        self.host = host.into();
508        self
509    }
510
511    /// Set the PostgreSQL port
512    pub fn with_port(mut self, port: u16) -> Self {
513        self.port = port;
514        self
515    }
516
517    /// Set the database name
518    pub fn with_database(mut self, database: impl Into<String>) -> Self {
519        self.database = database.into();
520        self
521    }
522
523    /// Set the database user
524    pub fn with_user(mut self, user: impl Into<String>) -> Self {
525        self.user = user.into();
526        self
527    }
528
529    /// Set the database password
530    pub fn with_password(mut self, password: impl Into<String>) -> Self {
531        self.password = password.into();
532        self
533    }
534
535    /// Set the tables to replicate
536    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
537        self.tables = tables;
538        self
539    }
540
541    /// Add a table to replicate
542    pub fn add_table(mut self, table: impl Into<String>) -> Self {
543        self.tables.push(table.into());
544        self
545    }
546
547    /// Set the replication slot name
548    pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
549        self.slot_name = slot_name.into();
550        self
551    }
552
553    /// Set the publication name
554    pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
555        self.publication_name = publication_name.into();
556        self
557    }
558
559    /// Set the SSL mode
560    pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
561        self.ssl_mode = ssl_mode;
562        self
563    }
564
565    /// Set the table key configurations
566    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
567        self.table_keys = table_keys;
568        self
569    }
570
571    /// Add a table key configuration
572    pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
573        self.table_keys.push(table_key);
574        self
575    }
576
577    /// Set the dispatch mode for this source
578    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
579        self.dispatch_mode = Some(mode);
580        self
581    }
582
583    /// Set the dispatch buffer capacity for this source
584    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
585        self.dispatch_buffer_capacity = Some(capacity);
586        self
587    }
588
589    /// Set the bootstrap provider for this source
590    pub fn with_bootstrap_provider(
591        mut self,
592        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
593    ) -> Self {
594        self.bootstrap_provider = Some(Box::new(provider));
595        self
596    }
597
598    /// Set whether this source should auto-start when DrasiLib starts.
599    ///
600    /// Default is `true`. Set to `false` if this source should only be
601    /// started manually via `start_source()`.
602    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
603        self.auto_start = auto_start;
604        self
605    }
606
607    /// Set the full configuration at once
608    pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
609        self.host = config.host;
610        self.port = config.port;
611        self.database = config.database;
612        self.user = config.user;
613        self.password = config.password;
614        self.tables = config.tables;
615        self.slot_name = config.slot_name;
616        self.publication_name = config.publication_name;
617        self.ssl_mode = config.ssl_mode;
618        self.table_keys = config.table_keys;
619        self
620    }
621
622    /// Build the PostgreSQL replication source
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the source cannot be constructed.
627    pub fn build(self) -> Result<PostgresReplicationSource> {
628        let config = PostgresSourceConfig {
629            host: self.host,
630            port: self.port,
631            database: self.database,
632            user: self.user,
633            password: self.password,
634            tables: self.tables,
635            slot_name: self.slot_name,
636            publication_name: self.publication_name,
637            ssl_mode: self.ssl_mode,
638            table_keys: self.table_keys,
639        };
640
641        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
642        if let Some(mode) = self.dispatch_mode {
643            params = params.with_dispatch_mode(mode);
644        }
645        if let Some(capacity) = self.dispatch_buffer_capacity {
646            params = params.with_dispatch_buffer_capacity(capacity);
647        }
648        if let Some(provider) = self.bootstrap_provider {
649            params = params.with_bootstrap_provider(provider);
650        }
651
652        Ok(PostgresReplicationSource {
653            base: SourceBase::new(params)?,
654            config,
655        })
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662
663    mod construction {
664        use super::*;
665
666        #[test]
667        fn test_builder_with_valid_config() {
668            let source = PostgresSourceBuilder::new("test-source")
669                .with_database("testdb")
670                .with_user("testuser")
671                .build();
672            assert!(source.is_ok());
673        }
674
675        #[test]
676        fn test_builder_with_custom_config() {
677            let source = PostgresSourceBuilder::new("pg-source")
678                .with_host("192.168.1.100")
679                .with_port(5433)
680                .with_database("production")
681                .with_user("admin")
682                .with_password("secret")
683                .build()
684                .unwrap();
685            assert_eq!(source.id(), "pg-source");
686        }
687
688        #[test]
689        fn test_with_dispatch_creates_source() {
690            let config = PostgresSourceConfig {
691                host: "localhost".to_string(),
692                port: 5432,
693                database: "testdb".to_string(),
694                user: "testuser".to_string(),
695                password: String::new(),
696                tables: Vec::new(),
697                slot_name: "drasi_slot".to_string(),
698                publication_name: "drasi_publication".to_string(),
699                ssl_mode: SslMode::default(),
700                table_keys: Vec::new(),
701            };
702            let source = PostgresReplicationSource::with_dispatch(
703                "dispatch-source",
704                config,
705                Some(DispatchMode::Channel),
706                Some(2000),
707            );
708            assert!(source.is_ok());
709            assert_eq!(source.unwrap().id(), "dispatch-source");
710        }
711    }
712
713    mod properties {
714        use super::*;
715
716        #[test]
717        fn test_id_returns_correct_value() {
718            let source = PostgresSourceBuilder::new("my-pg-source")
719                .with_database("db")
720                .with_user("user")
721                .build()
722                .unwrap();
723            assert_eq!(source.id(), "my-pg-source");
724        }
725
726        #[test]
727        fn test_type_name_returns_postgres() {
728            let source = PostgresSourceBuilder::new("test")
729                .with_database("db")
730                .with_user("user")
731                .build()
732                .unwrap();
733            assert_eq!(source.type_name(), "postgres");
734        }
735
736        #[test]
737        fn test_properties_contains_connection_info() {
738            let source = PostgresSourceBuilder::new("test")
739                .with_host("db.example.com")
740                .with_port(5433)
741                .with_database("mydb")
742                .with_user("app_user")
743                .with_password("secret")
744                .with_tables(vec!["users".to_string()])
745                .build()
746                .unwrap();
747            let props = source.properties();
748
749            assert_eq!(
750                props.get("host"),
751                Some(&serde_json::Value::String("db.example.com".to_string()))
752            );
753            assert_eq!(
754                props.get("port"),
755                Some(&serde_json::Value::Number(5433.into()))
756            );
757            assert_eq!(
758                props.get("database"),
759                Some(&serde_json::Value::String("mydb".to_string()))
760            );
761            assert_eq!(
762                props.get("user"),
763                Some(&serde_json::Value::String("app_user".to_string()))
764            );
765        }
766
767        #[test]
768        fn test_properties_includes_password() {
769            let source = PostgresSourceBuilder::new("test")
770                .with_database("db")
771                .with_user("user")
772                .with_password("super_secret_password")
773                .build()
774                .unwrap();
775            let props = source.properties();
776
777            // Password must be preserved for config persistence roundtrip
778            assert_eq!(
779                props.get("password"),
780                Some(&serde_json::Value::String(
781                    "super_secret_password".to_string()
782                ))
783            );
784        }
785
786        #[test]
787        fn test_properties_includes_tables() {
788            let source = PostgresSourceBuilder::new("test")
789                .with_database("db")
790                .with_user("user")
791                .with_tables(vec!["users".to_string(), "orders".to_string()])
792                .build()
793                .unwrap();
794            let props = source.properties();
795
796            let tables = props.get("tables").unwrap().as_array().unwrap();
797            assert_eq!(tables.len(), 2);
798            assert_eq!(tables[0], "users");
799            assert_eq!(tables[1], "orders");
800        }
801    }
802
803    mod lifecycle {
804        use super::*;
805
806        /// A test secret resolver that returns a fixed value for any secret name.
807        struct TestSecretResolver;
808
809        impl drasi_plugin_sdk::resolver::ValueResolver for TestSecretResolver {
810            fn resolve_to_string(
811                &self,
812                value: &drasi_plugin_sdk::ConfigValue<String>,
813            ) -> Result<String, drasi_plugin_sdk::resolver::ResolverError> {
814                match value {
815                    drasi_plugin_sdk::ConfigValue::Secret { name } => {
816                        Ok(format!("resolved-secret-{name}"))
817                    }
818                    _ => Err(drasi_plugin_sdk::resolver::ResolverError::WrongResolverType),
819                }
820            }
821        }
822
823        fn ensure_test_secret_resolver() {
824            let _ = drasi_plugin_sdk::resolver::register_secret_resolver(std::sync::Arc::new(
825                TestSecretResolver,
826            ));
827        }
828
829        #[tokio::test]
830        async fn test_descriptor_preserves_secret_envelope() {
831            use crate::descriptor::PostgresSourceDescriptor;
832            use drasi_lib::sources::Source;
833            use drasi_plugin_sdk::descriptor::SourcePluginDescriptor;
834
835            ensure_test_secret_resolver();
836
837            let config_json = serde_json::json!({
838                "host": "db.example.com",
839                "port": 5432,
840                "database": "mydb",
841                "user": "app_user",
842                "password": {
843                    "kind": "Secret",
844                    "name": "db-password"
845                },
846                "tables": ["users"],
847                "slotName": "drasi_slot",
848                "publicationName": "drasi_pub"
849            });
850
851            let descriptor = PostgresSourceDescriptor;
852            let source = descriptor
853                .create_source("pg-secret-test", &config_json, true)
854                .await
855                .expect("descriptor should create source");
856
857            let props = source.properties();
858
859            // Password must be the Secret envelope, NOT the resolved value
860            let password = props.get("password").expect("password must be present");
861            assert!(
862                password.is_object(),
863                "password should be Secret envelope, got: {password}"
864            );
865            assert_eq!(
866                password.get("kind").and_then(|v| v.as_str()),
867                Some("Secret"),
868                "envelope kind must be Secret"
869            );
870            assert_eq!(
871                password.get("name").and_then(|v| v.as_str()),
872                Some("db-password"),
873                "secret name must be preserved"
874            );
875
876            // Resolved value must NOT leak into persisted properties
877            let props_str = serde_json::to_string(&props).unwrap();
878            assert!(
879                !props_str.contains("resolved-secret-db-password"),
880                "resolved secret must not appear in properties"
881            );
882
883            // Keys must be camelCase (from raw_config)
884            assert!(
885                props.contains_key("slotName"),
886                "expected camelCase 'slotName', got keys: {:?}",
887                props.keys().collect::<Vec<_>>()
888            );
889            assert!(
890                props.contains_key("publicationName"),
891                "expected camelCase 'publicationName'"
892            );
893        }
894
895        #[tokio::test]
896        async fn test_initial_status_is_stopped() {
897            let source = PostgresSourceBuilder::new("test")
898                .with_database("db")
899                .with_user("user")
900                .build()
901                .unwrap();
902            assert_eq!(source.status().await, ComponentStatus::Stopped);
903        }
904
905        #[test]
906        fn test_builder_fallback_produces_camel_case() {
907            use drasi_lib::sources::Source;
908
909            let source = PostgresSourceBuilder::new("pg-fallback")
910                .with_host("myhost.example.com")
911                .with_port(5433)
912                .with_database("mydb")
913                .with_user("admin")
914                .with_password("secret123")
915                .with_ssl_mode(SslMode::Require)
916                .with_slot_name("custom_slot")
917                .with_publication_name("custom_pub")
918                .build()
919                .unwrap();
920
921            let props = source.properties();
922
923            // Must use camelCase keys (DTO serialization)
924            assert!(
925                props.contains_key("slotName"),
926                "expected camelCase 'slotName', got keys: {:?}",
927                props.keys().collect::<Vec<_>>()
928            );
929            assert!(
930                props.contains_key("publicationName"),
931                "expected camelCase 'publicationName'"
932            );
933            assert!(
934                props.contains_key("sslMode"),
935                "expected camelCase 'sslMode'"
936            );
937
938            // Must NOT have snake_case keys
939            assert!(
940                !props.contains_key("slot_name"),
941                "should not have snake_case 'slot_name'"
942            );
943            assert!(
944                !props.contains_key("publication_name"),
945                "should not have snake_case 'publication_name'"
946            );
947
948            // Values should be correct
949            assert_eq!(
950                props.get("host").and_then(|v| v.as_str()),
951                Some("myhost.example.com")
952            );
953            assert_eq!(props.get("port").and_then(|v| v.as_u64()), Some(5433));
954            assert_eq!(props.get("database").and_then(|v| v.as_str()), Some("mydb"));
955            assert_eq!(
956                props.get("password").and_then(|v| v.as_str()),
957                Some("secret123")
958            );
959        }
960    }
961
962    mod builder {
963        use super::*;
964
965        #[test]
966        fn test_postgres_builder_defaults() {
967            let source = PostgresSourceBuilder::new("test").build().unwrap();
968            assert_eq!(source.config.host, "localhost");
969            assert_eq!(source.config.port, 5432);
970            assert_eq!(source.config.slot_name, "drasi_slot");
971            assert_eq!(source.config.publication_name, "drasi_publication");
972        }
973
974        #[test]
975        fn test_postgres_builder_custom_values() {
976            let source = PostgresSourceBuilder::new("test")
977                .with_host("db.example.com")
978                .with_port(5433)
979                .with_database("production")
980                .with_user("app_user")
981                .with_password("secret")
982                .with_tables(vec!["users".to_string(), "orders".to_string()])
983                .build()
984                .unwrap();
985
986            assert_eq!(source.config.host, "db.example.com");
987            assert_eq!(source.config.port, 5433);
988            assert_eq!(source.config.database, "production");
989            assert_eq!(source.config.user, "app_user");
990            assert_eq!(source.config.password, "secret");
991            assert_eq!(source.config.tables.len(), 2);
992            assert_eq!(source.config.tables[0], "users");
993            assert_eq!(source.config.tables[1], "orders");
994        }
995
996        #[test]
997        fn test_builder_add_table() {
998            let source = PostgresSourceBuilder::new("test")
999                .add_table("table1")
1000                .add_table("table2")
1001                .add_table("table3")
1002                .build()
1003                .unwrap();
1004
1005            assert_eq!(source.config.tables.len(), 3);
1006            assert_eq!(source.config.tables[0], "table1");
1007            assert_eq!(source.config.tables[1], "table2");
1008            assert_eq!(source.config.tables[2], "table3");
1009        }
1010
1011        #[test]
1012        fn test_builder_slot_and_publication() {
1013            let source = PostgresSourceBuilder::new("test")
1014                .with_slot_name("custom_slot")
1015                .with_publication_name("custom_pub")
1016                .build()
1017                .unwrap();
1018
1019            assert_eq!(source.config.slot_name, "custom_slot");
1020            assert_eq!(source.config.publication_name, "custom_pub");
1021        }
1022
1023        #[test]
1024        fn test_builder_id() {
1025            let source = PostgresReplicationSource::builder("my-pg-source")
1026                .with_database("db")
1027                .with_user("user")
1028                .build()
1029                .unwrap();
1030
1031            assert_eq!(source.base.id, "my-pg-source");
1032        }
1033    }
1034
1035    mod config {
1036        use super::*;
1037
1038        #[test]
1039        fn test_config_serialization() {
1040            let config = PostgresSourceConfig {
1041                host: "localhost".to_string(),
1042                port: 5432,
1043                database: "testdb".to_string(),
1044                user: "testuser".to_string(),
1045                password: String::new(),
1046                tables: Vec::new(),
1047                slot_name: "drasi_slot".to_string(),
1048                publication_name: "drasi_publication".to_string(),
1049                ssl_mode: SslMode::default(),
1050                table_keys: Vec::new(),
1051            };
1052
1053            let json = serde_json::to_string(&config).unwrap();
1054            let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
1055
1056            assert_eq!(config, deserialized);
1057        }
1058
1059        #[test]
1060        fn test_config_deserialization_with_required_fields() {
1061            let json = r#"{
1062                "database": "mydb",
1063                "user": "myuser"
1064            }"#;
1065            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1066
1067            assert_eq!(config.database, "mydb");
1068            assert_eq!(config.user, "myuser");
1069            assert_eq!(config.host, "localhost"); // default
1070            assert_eq!(config.port, 5432); // default
1071            assert_eq!(config.slot_name, "drasi_slot"); // default
1072        }
1073
1074        #[test]
1075        fn test_config_deserialization_full() {
1076            let json = r#"{
1077                "host": "db.prod.internal",
1078                "port": 5433,
1079                "database": "production",
1080                "user": "replication_user",
1081                "password": "secret",
1082                "tables": ["accounts", "transactions"],
1083                "slot_name": "prod_slot",
1084                "publication_name": "prod_publication"
1085            }"#;
1086            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1087
1088            assert_eq!(config.host, "db.prod.internal");
1089            assert_eq!(config.port, 5433);
1090            assert_eq!(config.database, "production");
1091            assert_eq!(config.user, "replication_user");
1092            assert_eq!(config.password, "secret");
1093            assert_eq!(config.tables, vec!["accounts", "transactions"]);
1094            assert_eq!(config.slot_name, "prod_slot");
1095            assert_eq!(config.publication_name, "prod_publication");
1096        }
1097    }
1098}
1099
1100/// Dynamic plugin entry point.
1101///
1102/// Dynamic plugin entry point.
1103#[cfg(feature = "dynamic-plugin")]
1104drasi_plugin_sdk::export_plugin!(
1105    plugin_id = "postgres-source",
1106    core_version = env!("CARGO_PKG_VERSION"),
1107    lib_version = env!("CARGO_PKG_VERSION"),
1108    plugin_version = env!("CARGO_PKG_VERSION"),
1109    source_descriptors = [descriptor::PostgresSourceDescriptor],
1110    reaction_descriptors = [],
1111    bootstrap_descriptors = [],
1112);