Skip to main content

drasi_source_postgres/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unexpected_cfgs)]
16
17//! PostgreSQL Replication Source Plugin for Drasi
18//!
19//! This plugin captures data changes from PostgreSQL databases using logical replication.
20//! It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL)
21//! messages in real-time, converting them to Drasi source change events.
22//!
23//! # Prerequisites
24//!
25//! Before using this source, you must configure PostgreSQL for logical replication:
26//!
27//! 1. **Enable logical replication** in `postgresql.conf`:
28//!    ```text
29//!    wal_level = logical
30//!    max_replication_slots = 10
31//!    max_wal_senders = 10
32//!    ```
33//!
34//! 2. **Create a publication** for the tables you want to monitor:
35//!    ```sql
36//!    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
37//!    ```
38//!
39//! 3. **Create a replication slot** (optional - the source can create one automatically):
40//!    ```sql
41//!    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
42//!    ```
43//!
44//! 4. **Grant replication permissions** to the database user:
45//!    ```sql
46//!    ALTER ROLE drasi_user REPLICATION;
47//!    GRANT SELECT ON TABLE users, orders TO drasi_user;
48//!    ```
49//!
50//! # Architecture
51//!
52//! The source has two main components:
53//!
54//! - **Bootstrap Handler**: Performs an initial snapshot of table data when a query
55//!   subscribes with bootstrap enabled. Uses the replication slot's snapshot LSN to
56//!   ensure consistency.
57//!
58//! - **Streaming Handler**: Continuously reads WAL messages and decodes them using
59//!   the `pgoutput` protocol. Handles INSERT, UPDATE, and DELETE operations.
60//!
61//! # Configuration
62//!
63//! | Field | Type | Default | Description |
64//! |-------|------|---------|-------------|
65//! | `host` | string | `"localhost"` | PostgreSQL host |
66//! | `port` | u16 | `5432` | PostgreSQL port |
67//! | `database` | string | *required* | Database name |
68//! | `user` | string | *required* | Database user (must have replication permission) |
69//! | `password` | string | `""` | Database password |
70//! | `tables` | string[] | `[]` | Tables to replicate |
71//! | `slot_name` | string | `"drasi_slot"` | Replication slot name |
72//! | `publication_name` | string | `"drasi_publication"` | Publication name |
73//! | `ssl_mode` | string | `"prefer"` | SSL mode: disable, prefer, require |
74//! | `table_keys` | TableKeyConfig[] | `[]` | Primary key configuration for tables |
75//!
76//! # Example Configuration (YAML)
77//!
78//! ```yaml
79//! source_type: postgres
80//! properties:
81//!   host: db.example.com
82//!   port: 5432
83//!   database: production
84//!   user: replication_user
85//!   password: secret
86//!   tables:
87//!     - users
88//!     - orders
89//!   slot_name: drasi_slot
90//!   publication_name: drasi_publication
91//!   table_keys:
92//!     - table: users
93//!       key_columns: [id]
94//!     - table: orders
95//!       key_columns: [order_id]
96//! ```
97//!
98//! # Data Format
99//!
100//! The PostgreSQL source decodes WAL messages and converts them to Drasi source changes.
101//! Each row change is mapped as follows:
102//!
103//! ## Node Mapping
104//!
105//! - **Element ID**: `{schema}:{table}:{primary_key_value}` (e.g., `public:users:123`)
106//! - **Labels**: `[{table_name}]` (e.g., `["users"]`)
107//! - **Properties**: All columns from the row (column names become property keys)
108//!
109//! ## WAL Message to SourceChange
110//!
111//! | WAL Operation | SourceChange |
112//! |---------------|--------------|
113//! | INSERT | `SourceChange::Insert { element: Node }` |
114//! | UPDATE | `SourceChange::Update { element: Node }` |
115//! | DELETE | `SourceChange::Delete { metadata }` |
116//!
117//! ## Example Mapping
118//!
119//! Given a PostgreSQL table:
120//!
121//! ```sql
122//! CREATE TABLE users (
123//!     id SERIAL PRIMARY KEY,
124//!     name VARCHAR(100),
125//!     email VARCHAR(255),
126//!     age INTEGER
127//! );
128//!
129//! INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
130//! ```
131//!
132//! Produces a SourceChange equivalent to:
133//!
134//! ```json
135//! {
136//!     "type": "Insert",
137//!     "element": {
138//!         "metadata": {
139//!             "element_id": "public:users:1",
140//!             "source_id": "pg-source",
141//!             "labels": ["users"],
142//!             "effective_from": 1699900000000000
143//!         },
144//!         "properties": {
145//!             "id": 1,
146//!             "name": "Alice",
147//!             "email": "alice@example.com",
148//!             "age": 30
149//!         }
150//!     }
151//! }
152//! ```
153//!
154//! # Usage Example
155//!
156//! ```rust,ignore
157//! use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
158//! use std::sync::Arc;
159//!
160//! let config = PostgresSourceBuilder::new()
161//!     .with_host("db.example.com")
162//!     .with_database("production")
163//!     .with_user("replication_user")
164//!     .with_password("secret")
165//!     .with_tables(vec!["users".to_string(), "orders".to_string()])
166//!     .build();
167//!
168//! let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
169//! drasi.add_source(source).await?;
170//! ```
171
172pub mod config;
173pub mod connection;
174pub mod decoder;
175pub mod descriptor;
176pub mod protocol;
177pub mod scram;
178pub mod stream;
179pub mod types;
180
181pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
182
183use anyhow::{anyhow, Result};
184use async_trait::async_trait;
185use drasi_lib::schema::{
186    normalize_table_label, NodeSchema, PropertySchema, PropertyType, SourceSchema,
187};
188use log::{debug, error, info};
189use postgres_native_tls::MakeTlsConnector;
190use std::collections::HashMap;
191use std::sync::Arc;
192use tokio::sync::RwLock;
193
194use drasi_lib::channels::{DispatchMode, *};
195use drasi_lib::component_graph::ComponentStatusHandle;
196use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
197use drasi_lib::Source;
198use tracing::Instrument;
199
200/// PostgreSQL replication source that captures changes via logical replication.
201///
202/// This source connects to PostgreSQL using the replication protocol and decodes
203/// WAL messages in real-time, converting them to Drasi source change events.
204///
205/// # Fields
206///
207/// - `base`: Common source functionality (dispatchers, status, lifecycle)
208/// - `config`: PostgreSQL connection and replication configuration
209pub struct PostgresReplicationSource {
210    /// Base source implementation providing common functionality
211    base: SourceBase,
212    /// PostgreSQL source configuration
213    config: PostgresSourceConfig,
214    /// Best-effort cached schema populated from information_schema on start.
215    cached_schema: Arc<std::sync::RwLock<Option<SourceSchema>>>,
216}
217
218fn postgres_type_to_property_type(data_type: &str) -> Option<PropertyType> {
219    match data_type {
220        "smallint" | "integer" | "bigint" => Some(PropertyType::Integer),
221        "real" | "double precision" | "numeric" | "decimal" => Some(PropertyType::Float),
222        "boolean" => Some(PropertyType::Boolean),
223        "timestamp without time zone"
224        | "timestamp with time zone"
225        | "date"
226        | "time without time zone"
227        | "time with time zone" => Some(PropertyType::Timestamp),
228        "json" | "jsonb" => Some(PropertyType::Json),
229        "character" | "character varying" | "text" | "uuid" | "bytea" => Some(PropertyType::String),
230        _ => None,
231    }
232}
233
234async fn introspect_postgres_schema(config: &PostgresSourceConfig) -> Result<Option<SourceSchema>> {
235    if config.tables.is_empty() {
236        return Ok(None);
237    }
238
239    let mut pg_config = tokio_postgres::Config::new();
240    pg_config.host(&config.host);
241    pg_config.port(config.port);
242    pg_config.dbname(&config.database);
243    pg_config.user(&config.user);
244    if !config.password.is_empty() {
245        pg_config.password(&config.password);
246    }
247
248    let client = match config.ssl_mode {
249        SslMode::Require => {
250            pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
251            let tls_connector = native_tls::TlsConnector::builder()
252                .danger_accept_invalid_hostnames(false)
253                .danger_accept_invalid_certs(false)
254                .build()
255                .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
256            let connector = MakeTlsConnector::new(tls_connector);
257
258            debug!("Schema introspection: connecting with SSL (require)");
259            let (client, connection) = pg_config.connect(connector).await?;
260            tokio::spawn(async move {
261                if let Err(e) = connection.await {
262                    log::warn!("PostgreSQL schema introspection connection closed: {e}");
263                }
264            });
265            client
266        }
267        SslMode::Prefer => {
268            // Try TLS first, fall back to plaintext
269            let tls_connector = native_tls::TlsConnector::builder()
270                .danger_accept_invalid_hostnames(false)
271                .danger_accept_invalid_certs(false)
272                .build()
273                .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
274            let connector = MakeTlsConnector::new(tls_connector);
275
276            pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer);
277            debug!("Schema introspection: connecting with SSL (prefer)");
278            let (client, connection) = pg_config.connect(connector).await?;
279            tokio::spawn(async move {
280                if let Err(e) = connection.await {
281                    log::warn!("PostgreSQL schema introspection connection closed: {e}");
282                }
283            });
284            client
285        }
286        SslMode::Disable => {
287            debug!("Schema introspection: connecting without SSL");
288            let (client, connection) = pg_config.connect(tokio_postgres::NoTls).await?;
289            tokio::spawn(async move {
290                if let Err(e) = connection.await {
291                    log::warn!("PostgreSQL schema introspection connection closed: {e}");
292                }
293            });
294            client
295        }
296    };
297
298    let mut nodes = Vec::new();
299
300    for table in &config.tables {
301        let (schema_name, table_name) = table
302            .split_once('.')
303            .map(|(schema, name)| (schema.to_string(), name.to_string()))
304            .unwrap_or_else(|| ("public".to_string(), table.to_string()));
305
306        let rows = client
307            .query(
308                "SELECT column_name, data_type \
309                 FROM information_schema.columns \
310                 WHERE table_schema = $1 AND table_name = $2 \
311                 ORDER BY ordinal_position",
312                &[&schema_name, &table_name],
313            )
314            .await?;
315
316        let properties = rows
317            .into_iter()
318            .map(|row| PropertySchema {
319                name: row.get::<_, String>(0),
320                data_type: postgres_type_to_property_type(&row.get::<_, String>(1)),
321                description: None,
322            })
323            .collect();
324
325        nodes.push(NodeSchema {
326            label: normalize_table_label(&table_name),
327            properties,
328        });
329    }
330
331    Ok(Some(SourceSchema {
332        nodes,
333        relations: Vec::new(),
334    }))
335}
336
337impl PostgresReplicationSource {
338    /// Create a builder for PostgresReplicationSource
339    ///
340    /// # Example
341    ///
342    /// ```rust,ignore
343    /// use drasi_source_postgres::PostgresReplicationSource;
344    ///
345    /// let source = PostgresReplicationSource::builder("pg-source")
346    ///     .with_host("db.example.com")
347    ///     .with_database("production")
348    ///     .with_user("replication_user")
349    ///     .with_password("secret")
350    ///     .with_tables(vec!["users".to_string(), "orders".to_string()])
351    ///     .with_bootstrap_provider(my_provider)
352    ///     .build()?;
353    /// ```
354    pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
355        PostgresSourceBuilder::new(id)
356    }
357
358    /// Create a new PostgreSQL replication source.
359    ///
360    /// The event channel is automatically injected when the source is added
361    /// to DrasiLib via `add_source()`.
362    ///
363    /// # Arguments
364    ///
365    /// * `id` - Unique identifier for this source instance
366    /// * `config` - PostgreSQL source configuration
367    ///
368    /// # Returns
369    ///
370    /// A new `PostgresReplicationSource` instance, or an error if construction fails.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the base source cannot be initialized.
375    ///
376    /// # Example
377    ///
378    /// ```rust,ignore
379    /// use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
380    ///
381    /// let config = PostgresSourceBuilder::new()
382    ///     .with_host("db.example.com")
383    ///     .with_database("mydb")
384    ///     .with_user("replication_user")
385    ///     .build();
386    ///
387    /// let source = PostgresReplicationSource::new("my-pg-source", config)?;
388    /// ```
389    pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
390        let id = id.into();
391        let params = SourceBaseParams::new(id);
392        Ok(Self {
393            base: SourceBase::new(params)?,
394            config,
395            cached_schema: Arc::new(std::sync::RwLock::new(None)),
396        })
397    }
398
399    /// Create a new PostgreSQL replication source with custom dispatch settings
400    ///
401    /// The event channel is automatically injected when the source is added
402    /// to DrasiLib via `add_source()`.
403    pub fn with_dispatch(
404        id: impl Into<String>,
405        config: PostgresSourceConfig,
406        dispatch_mode: Option<DispatchMode>,
407        dispatch_buffer_capacity: Option<usize>,
408    ) -> Result<Self> {
409        let id = id.into();
410        let mut params = SourceBaseParams::new(id);
411        if let Some(mode) = dispatch_mode {
412            params = params.with_dispatch_mode(mode);
413        }
414        if let Some(capacity) = dispatch_buffer_capacity {
415            params = params.with_dispatch_buffer_capacity(capacity);
416        }
417        Ok(Self {
418            base: SourceBase::new(params)?,
419            config,
420            cached_schema: Arc::new(std::sync::RwLock::new(None)),
421        })
422    }
423}
424
425#[async_trait]
426impl Source for PostgresReplicationSource {
427    fn id(&self) -> &str {
428        &self.base.id
429    }
430
431    fn type_name(&self) -> &str {
432        "postgres"
433    }
434
435    fn properties(&self) -> HashMap<String, serde_json::Value> {
436        use crate::descriptor::PostgresSourceConfigDto;
437
438        self.base
439            .properties_or_serialize(&PostgresSourceConfigDto::from(&self.config))
440    }
441
442    fn auto_start(&self) -> bool {
443        self.base.get_auto_start()
444    }
445
446    fn describe_schema(&self) -> Option<SourceSchema> {
447        self.cached_schema
448            .read()
449            .ok()
450            .and_then(|schema| schema.clone())
451            .or_else(|| {
452                if self.config.tables.is_empty() {
453                    None
454                } else {
455                    Some(SourceSchema {
456                        nodes: self
457                            .config
458                            .tables
459                            .iter()
460                            .map(|table| NodeSchema::new(normalize_table_label(table)))
461                            .collect(),
462                        relations: Vec::new(),
463                    })
464                }
465            })
466    }
467
468    async fn start(&self) -> Result<()> {
469        if self.base.get_status().await == ComponentStatus::Running {
470            return Ok(());
471        }
472
473        self.base.set_status(ComponentStatus::Starting, None).await;
474        info!("Starting PostgreSQL replication source: {}", self.base.id);
475
476        match introspect_postgres_schema(&self.config).await {
477            Ok(Some(schema)) => {
478                if let Ok(mut cached) = self.cached_schema.write() {
479                    *cached = Some(schema);
480                }
481            }
482            Ok(None) => {}
483            Err(e) => {
484                log::warn!(
485                    "Failed to introspect PostgreSQL schema for '{}': {e}",
486                    self.base.id
487                );
488            }
489        }
490
491        let config = self.config.clone();
492        let source_id = self.base.id.clone();
493        let dispatchers = self.base.dispatchers.clone();
494        let reporter = self.base.status_handle();
495
496        // Get instance_id from context for log routing isolation
497        let instance_id = self
498            .base
499            .context()
500            .await
501            .map(|c| c.instance_id)
502            .unwrap_or_default();
503
504        // Create span for spawned task so log::info!, log::error! etc are routed
505        let source_id_for_span = source_id.clone();
506        let span = tracing::info_span!(
507            "postgres_replication_task",
508            instance_id = %instance_id,
509            component_id = %source_id_for_span,
510            component_type = "source"
511        );
512
513        let task = tokio::spawn(
514            async move {
515                if let Err(e) =
516                    run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
517                {
518                    error!("Replication task failed for {source_id}: {e}");
519                    reporter
520                        .set_status(
521                            ComponentStatus::Error,
522                            Some(format!("Replication failed: {e}")),
523                        )
524                        .await;
525                }
526            }
527            .instrument(span),
528        );
529
530        *self.base.task_handle.write().await = Some(task);
531        self.base
532            .set_status(
533                ComponentStatus::Running,
534                Some("PostgreSQL replication started".to_string()),
535            )
536            .await;
537
538        Ok(())
539    }
540
541    async fn stop(&self) -> Result<()> {
542        if self.base.get_status().await != ComponentStatus::Running {
543            return Ok(());
544        }
545
546        info!("Stopping PostgreSQL replication source: {}", self.base.id);
547
548        self.base.set_status(ComponentStatus::Stopping, None).await;
549
550        // Cancel the replication task
551        if let Some(task) = self.base.task_handle.write().await.take() {
552            task.abort();
553        }
554
555        // Clear cached schema so a subsequent start() re-introspects
556        if let Ok(mut cached) = self.cached_schema.write() {
557            *cached = None;
558        }
559
560        self.base
561            .set_status(
562                ComponentStatus::Stopped,
563                Some("PostgreSQL replication stopped".to_string()),
564            )
565            .await;
566
567        Ok(())
568    }
569
570    async fn status(&self) -> ComponentStatus {
571        self.base.get_status().await
572    }
573
574    async fn subscribe(
575        &self,
576        settings: drasi_lib::config::SourceSubscriptionSettings,
577    ) -> Result<SubscriptionResponse> {
578        self.base
579            .subscribe_with_bootstrap(&settings, "PostgreSQL")
580            .await
581    }
582
583    fn as_any(&self) -> &dyn std::any::Any {
584        self
585    }
586
587    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
588        self.base.initialize(context).await;
589    }
590
591    async fn set_bootstrap_provider(
592        &self,
593        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
594    ) {
595        self.base.set_bootstrap_provider(provider).await;
596    }
597}
598
599async fn run_replication(
600    source_id: String,
601    config: PostgresSourceConfig,
602    dispatchers: Arc<
603        RwLock<
604            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
605        >,
606    >,
607    status_handle: ComponentStatusHandle,
608) -> Result<()> {
609    info!("Starting replication for source {source_id}");
610
611    let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
612
613    stream.run().await
614}
615
616/// Builder for PostgreSQL source configuration.
617///
618/// Provides a fluent API for constructing PostgreSQL source configurations
619/// with sensible defaults.
620///
621/// # Example
622///
623/// ```rust,ignore
624/// use drasi_source_postgres::PostgresReplicationSource;
625///
626/// let source = PostgresReplicationSource::builder("pg-source")
627///     .with_host("db.example.com")
628///     .with_database("production")
629///     .with_user("replication_user")
630///     .with_password("secret")
631///     .with_tables(vec!["users".to_string(), "orders".to_string()])
632///     .with_slot_name("my_slot")
633///     .build()?;
634/// ```
635pub struct PostgresSourceBuilder {
636    id: String,
637    host: String,
638    port: u16,
639    database: String,
640    user: String,
641    password: String,
642    tables: Vec<String>,
643    slot_name: String,
644    publication_name: String,
645    ssl_mode: SslMode,
646    table_keys: Vec<TableKeyConfig>,
647    dispatch_mode: Option<DispatchMode>,
648    dispatch_buffer_capacity: Option<usize>,
649    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
650    auto_start: bool,
651}
652
653impl PostgresSourceBuilder {
654    /// Create a new PostgreSQL source builder with the given ID and default values
655    pub fn new(id: impl Into<String>) -> Self {
656        Self {
657            id: id.into(),
658            host: "localhost".to_string(),
659            port: 5432,
660            database: String::new(),
661            user: String::new(),
662            password: String::new(),
663            tables: Vec::new(),
664            slot_name: "drasi_slot".to_string(),
665            publication_name: "drasi_publication".to_string(),
666            ssl_mode: SslMode::default(),
667            table_keys: Vec::new(),
668            dispatch_mode: None,
669            dispatch_buffer_capacity: None,
670            bootstrap_provider: None,
671            auto_start: true,
672        }
673    }
674
675    /// Set the PostgreSQL host
676    pub fn with_host(mut self, host: impl Into<String>) -> Self {
677        self.host = host.into();
678        self
679    }
680
681    /// Set the PostgreSQL port
682    pub fn with_port(mut self, port: u16) -> Self {
683        self.port = port;
684        self
685    }
686
687    /// Set the database name
688    pub fn with_database(mut self, database: impl Into<String>) -> Self {
689        self.database = database.into();
690        self
691    }
692
693    /// Set the database user
694    pub fn with_user(mut self, user: impl Into<String>) -> Self {
695        self.user = user.into();
696        self
697    }
698
699    /// Set the database password
700    pub fn with_password(mut self, password: impl Into<String>) -> Self {
701        self.password = password.into();
702        self
703    }
704
705    /// Set the tables to replicate
706    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
707        self.tables = tables;
708        self
709    }
710
711    /// Add a table to replicate
712    pub fn add_table(mut self, table: impl Into<String>) -> Self {
713        self.tables.push(table.into());
714        self
715    }
716
717    /// Set the replication slot name
718    pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
719        self.slot_name = slot_name.into();
720        self
721    }
722
723    /// Set the publication name
724    pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
725        self.publication_name = publication_name.into();
726        self
727    }
728
729    /// Set the SSL mode
730    pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
731        self.ssl_mode = ssl_mode;
732        self
733    }
734
735    /// Set the table key configurations
736    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
737        self.table_keys = table_keys;
738        self
739    }
740
741    /// Add a table key configuration
742    pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
743        self.table_keys.push(table_key);
744        self
745    }
746
747    /// Set the dispatch mode for this source
748    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
749        self.dispatch_mode = Some(mode);
750        self
751    }
752
753    /// Set the dispatch buffer capacity for this source
754    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
755        self.dispatch_buffer_capacity = Some(capacity);
756        self
757    }
758
759    /// Set the bootstrap provider for this source
760    pub fn with_bootstrap_provider(
761        mut self,
762        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
763    ) -> Self {
764        self.bootstrap_provider = Some(Box::new(provider));
765        self
766    }
767
768    /// Set whether this source should auto-start when DrasiLib starts.
769    ///
770    /// Default is `true`. Set to `false` if this source should only be
771    /// started manually via `start_source()`.
772    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
773        self.auto_start = auto_start;
774        self
775    }
776
777    /// Set the full configuration at once
778    pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
779        self.host = config.host;
780        self.port = config.port;
781        self.database = config.database;
782        self.user = config.user;
783        self.password = config.password;
784        self.tables = config.tables;
785        self.slot_name = config.slot_name;
786        self.publication_name = config.publication_name;
787        self.ssl_mode = config.ssl_mode;
788        self.table_keys = config.table_keys;
789        self
790    }
791
792    /// Build the PostgreSQL replication source
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the source cannot be constructed.
797    pub fn build(self) -> Result<PostgresReplicationSource> {
798        let config = PostgresSourceConfig {
799            host: self.host,
800            port: self.port,
801            database: self.database,
802            user: self.user,
803            password: self.password,
804            tables: self.tables,
805            slot_name: self.slot_name,
806            publication_name: self.publication_name,
807            ssl_mode: self.ssl_mode,
808            table_keys: self.table_keys,
809        };
810
811        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
812        if let Some(mode) = self.dispatch_mode {
813            params = params.with_dispatch_mode(mode);
814        }
815        if let Some(capacity) = self.dispatch_buffer_capacity {
816            params = params.with_dispatch_buffer_capacity(capacity);
817        }
818        if let Some(provider) = self.bootstrap_provider {
819            params = params.with_bootstrap_provider(provider);
820        }
821
822        Ok(PostgresReplicationSource {
823            base: SourceBase::new(params)?,
824            config,
825            cached_schema: Arc::new(std::sync::RwLock::new(None)),
826        })
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833
834    mod construction {
835        use super::*;
836
837        #[test]
838        fn test_builder_with_valid_config() {
839            let source = PostgresSourceBuilder::new("test-source")
840                .with_database("testdb")
841                .with_user("testuser")
842                .build();
843            assert!(source.is_ok());
844        }
845
846        #[test]
847        fn test_builder_with_custom_config() {
848            let source = PostgresSourceBuilder::new("pg-source")
849                .with_host("192.168.1.100")
850                .with_port(5433)
851                .with_database("production")
852                .with_user("admin")
853                .with_password("secret")
854                .build()
855                .unwrap();
856            assert_eq!(source.id(), "pg-source");
857        }
858
859        #[test]
860        fn test_with_dispatch_creates_source() {
861            let config = PostgresSourceConfig {
862                host: "localhost".to_string(),
863                port: 5432,
864                database: "testdb".to_string(),
865                user: "testuser".to_string(),
866                password: String::new(),
867                tables: Vec::new(),
868                slot_name: "drasi_slot".to_string(),
869                publication_name: "drasi_publication".to_string(),
870                ssl_mode: SslMode::default(),
871                table_keys: Vec::new(),
872            };
873            let source = PostgresReplicationSource::with_dispatch(
874                "dispatch-source",
875                config,
876                Some(DispatchMode::Channel),
877                Some(2000),
878            );
879            assert!(source.is_ok());
880            assert_eq!(source.unwrap().id(), "dispatch-source");
881        }
882    }
883
884    mod properties {
885        use super::*;
886
887        #[test]
888        fn test_id_returns_correct_value() {
889            let source = PostgresSourceBuilder::new("my-pg-source")
890                .with_database("db")
891                .with_user("user")
892                .build()
893                .unwrap();
894            assert_eq!(source.id(), "my-pg-source");
895        }
896
897        #[test]
898        fn test_type_name_returns_postgres() {
899            let source = PostgresSourceBuilder::new("test")
900                .with_database("db")
901                .with_user("user")
902                .build()
903                .unwrap();
904            assert_eq!(source.type_name(), "postgres");
905        }
906
907        #[test]
908        fn test_properties_contains_connection_info() {
909            let source = PostgresSourceBuilder::new("test")
910                .with_host("db.example.com")
911                .with_port(5433)
912                .with_database("mydb")
913                .with_user("app_user")
914                .with_password("secret")
915                .with_tables(vec!["users".to_string()])
916                .build()
917                .unwrap();
918            let props = source.properties();
919
920            assert_eq!(
921                props.get("host"),
922                Some(&serde_json::Value::String("db.example.com".to_string()))
923            );
924            assert_eq!(
925                props.get("port"),
926                Some(&serde_json::Value::Number(5433.into()))
927            );
928            assert_eq!(
929                props.get("database"),
930                Some(&serde_json::Value::String("mydb".to_string()))
931            );
932            assert_eq!(
933                props.get("user"),
934                Some(&serde_json::Value::String("app_user".to_string()))
935            );
936        }
937
938        #[test]
939        fn test_properties_includes_password() {
940            let source = PostgresSourceBuilder::new("test")
941                .with_database("db")
942                .with_user("user")
943                .with_password("super_secret_password")
944                .build()
945                .unwrap();
946            let props = source.properties();
947
948            // Password must be preserved for config persistence roundtrip
949            assert_eq!(
950                props.get("password"),
951                Some(&serde_json::Value::String(
952                    "super_secret_password".to_string()
953                ))
954            );
955        }
956
957        #[test]
958        fn test_properties_includes_tables() {
959            let source = PostgresSourceBuilder::new("test")
960                .with_database("db")
961                .with_user("user")
962                .with_tables(vec!["users".to_string(), "orders".to_string()])
963                .build()
964                .unwrap();
965            let props = source.properties();
966
967            let tables = props.get("tables").unwrap().as_array().unwrap();
968            assert_eq!(tables.len(), 2);
969            assert_eq!(tables[0], "users");
970            assert_eq!(tables[1], "orders");
971        }
972
973        #[test]
974        fn test_describe_schema_falls_back_to_configured_tables() {
975            let source = PostgresSourceBuilder::new("test")
976                .with_database("db")
977                .with_user("user")
978                .with_tables(vec!["public.users".to_string(), "orders".to_string()])
979                .build()
980                .unwrap();
981
982            let schema = source
983                .describe_schema()
984                .expect("configured postgres tables should produce fallback schema");
985
986            assert_eq!(schema.nodes.len(), 2);
987            assert!(schema.nodes.iter().any(|node| node.label == "users"));
988            assert!(schema.nodes.iter().any(|node| node.label == "orders"));
989        }
990
991        #[test]
992        fn test_postgres_type_to_property_type_integer() {
993            assert_eq!(
994                postgres_type_to_property_type("integer"),
995                Some(PropertyType::Integer)
996            );
997            assert_eq!(
998                postgres_type_to_property_type("bigint"),
999                Some(PropertyType::Integer)
1000            );
1001            assert_eq!(
1002                postgres_type_to_property_type("smallint"),
1003                Some(PropertyType::Integer)
1004            );
1005        }
1006
1007        #[test]
1008        fn test_postgres_type_to_property_type_float() {
1009            assert_eq!(
1010                postgres_type_to_property_type("double precision"),
1011                Some(PropertyType::Float)
1012            );
1013            assert_eq!(
1014                postgres_type_to_property_type("real"),
1015                Some(PropertyType::Float)
1016            );
1017            assert_eq!(
1018                postgres_type_to_property_type("numeric"),
1019                Some(PropertyType::Float)
1020            );
1021            assert_eq!(
1022                postgres_type_to_property_type("decimal"),
1023                Some(PropertyType::Float)
1024            );
1025        }
1026
1027        #[test]
1028        fn test_postgres_type_to_property_type_boolean() {
1029            assert_eq!(
1030                postgres_type_to_property_type("boolean"),
1031                Some(PropertyType::Boolean)
1032            );
1033        }
1034
1035        #[test]
1036        fn test_postgres_type_to_property_type_timestamp() {
1037            assert_eq!(
1038                postgres_type_to_property_type("timestamp with time zone"),
1039                Some(PropertyType::Timestamp)
1040            );
1041            assert_eq!(
1042                postgres_type_to_property_type("timestamp without time zone"),
1043                Some(PropertyType::Timestamp)
1044            );
1045            assert_eq!(
1046                postgres_type_to_property_type("date"),
1047                Some(PropertyType::Timestamp)
1048            );
1049        }
1050
1051        #[test]
1052        fn test_postgres_type_to_property_type_json() {
1053            assert_eq!(
1054                postgres_type_to_property_type("json"),
1055                Some(PropertyType::Json)
1056            );
1057            assert_eq!(
1058                postgres_type_to_property_type("jsonb"),
1059                Some(PropertyType::Json)
1060            );
1061        }
1062
1063        #[test]
1064        fn test_postgres_type_to_property_type_string() {
1065            assert_eq!(
1066                postgres_type_to_property_type("character varying"),
1067                Some(PropertyType::String)
1068            );
1069            assert_eq!(
1070                postgres_type_to_property_type("text"),
1071                Some(PropertyType::String)
1072            );
1073            assert_eq!(
1074                postgres_type_to_property_type("uuid"),
1075                Some(PropertyType::String)
1076            );
1077        }
1078
1079        #[test]
1080        fn test_postgres_type_to_property_type_unknown_returns_none() {
1081            assert_eq!(postgres_type_to_property_type("point"), None);
1082            assert_eq!(postgres_type_to_property_type("polygon"), None);
1083            assert_eq!(postgres_type_to_property_type("cidr"), None);
1084        }
1085    }
1086
1087    mod lifecycle {
1088        use super::*;
1089
1090        /// A test secret resolver that returns a fixed value for any secret name.
1091        struct TestSecretResolver;
1092
1093        #[async_trait::async_trait]
1094        impl drasi_plugin_sdk::resolver::ValueResolver for TestSecretResolver {
1095            async fn resolve_to_string(
1096                &self,
1097                value: &drasi_plugin_sdk::ConfigValue<String>,
1098            ) -> Result<String, drasi_plugin_sdk::resolver::ResolverError> {
1099                match value {
1100                    drasi_plugin_sdk::ConfigValue::Secret { name } => {
1101                        Ok(format!("resolved-secret-{name}"))
1102                    }
1103                    _ => Err(drasi_plugin_sdk::resolver::ResolverError::WrongResolverType),
1104                }
1105            }
1106        }
1107
1108        fn ensure_test_secret_resolver() {
1109            drasi_plugin_sdk::resolver::register_secret_resolver(std::sync::Arc::new(
1110                TestSecretResolver,
1111            ));
1112        }
1113
1114        #[tokio::test]
1115        async fn test_descriptor_preserves_secret_envelope() {
1116            use crate::descriptor::PostgresSourceDescriptor;
1117            use drasi_lib::sources::Source;
1118            use drasi_plugin_sdk::descriptor::SourcePluginDescriptor;
1119
1120            ensure_test_secret_resolver();
1121
1122            let config_json = serde_json::json!({
1123                "host": "db.example.com",
1124                "port": 5432,
1125                "database": "mydb",
1126                "user": "app_user",
1127                "password": {
1128                    "kind": "Secret",
1129                    "name": "db-password"
1130                },
1131                "tables": ["users"],
1132                "slotName": "drasi_slot",
1133                "publicationName": "drasi_pub"
1134            });
1135
1136            let descriptor = PostgresSourceDescriptor;
1137            let source = descriptor
1138                .create_source("pg-secret-test", &config_json, true)
1139                .await
1140                .expect("descriptor should create source");
1141
1142            let props = source.properties();
1143
1144            // Password must be the Secret envelope, NOT the resolved value
1145            let password = props.get("password").expect("password must be present");
1146            assert!(
1147                password.is_object(),
1148                "password should be Secret envelope, got: {password}"
1149            );
1150            assert_eq!(
1151                password.get("kind").and_then(|v| v.as_str()),
1152                Some("Secret"),
1153                "envelope kind must be Secret"
1154            );
1155            assert_eq!(
1156                password.get("name").and_then(|v| v.as_str()),
1157                Some("db-password"),
1158                "secret name must be preserved"
1159            );
1160
1161            // Resolved value must NOT leak into persisted properties
1162            let props_str = serde_json::to_string(&props).unwrap();
1163            assert!(
1164                !props_str.contains("resolved-secret-db-password"),
1165                "resolved secret must not appear in properties"
1166            );
1167
1168            // Keys must be camelCase (from raw_config)
1169            assert!(
1170                props.contains_key("slotName"),
1171                "expected camelCase 'slotName', got keys: {:?}",
1172                props.keys().collect::<Vec<_>>()
1173            );
1174            assert!(
1175                props.contains_key("publicationName"),
1176                "expected camelCase 'publicationName'"
1177            );
1178        }
1179
1180        #[tokio::test]
1181        async fn test_initial_status_is_stopped() {
1182            let source = PostgresSourceBuilder::new("test")
1183                .with_database("db")
1184                .with_user("user")
1185                .build()
1186                .unwrap();
1187            assert_eq!(source.status().await, ComponentStatus::Stopped);
1188        }
1189
1190        #[test]
1191        fn test_builder_fallback_produces_camel_case() {
1192            use drasi_lib::sources::Source;
1193
1194            let source = PostgresSourceBuilder::new("pg-fallback")
1195                .with_host("myhost.example.com")
1196                .with_port(5433)
1197                .with_database("mydb")
1198                .with_user("admin")
1199                .with_password("secret123")
1200                .with_ssl_mode(SslMode::Require)
1201                .with_slot_name("custom_slot")
1202                .with_publication_name("custom_pub")
1203                .build()
1204                .unwrap();
1205
1206            let props = source.properties();
1207
1208            // Must use camelCase keys (DTO serialization)
1209            assert!(
1210                props.contains_key("slotName"),
1211                "expected camelCase 'slotName', got keys: {:?}",
1212                props.keys().collect::<Vec<_>>()
1213            );
1214            assert!(
1215                props.contains_key("publicationName"),
1216                "expected camelCase 'publicationName'"
1217            );
1218            assert!(
1219                props.contains_key("sslMode"),
1220                "expected camelCase 'sslMode'"
1221            );
1222
1223            // Must NOT have snake_case keys
1224            assert!(
1225                !props.contains_key("slot_name"),
1226                "should not have snake_case 'slot_name'"
1227            );
1228            assert!(
1229                !props.contains_key("publication_name"),
1230                "should not have snake_case 'publication_name'"
1231            );
1232
1233            // Values should be correct
1234            assert_eq!(
1235                props.get("host").and_then(|v| v.as_str()),
1236                Some("myhost.example.com")
1237            );
1238            assert_eq!(props.get("port").and_then(|v| v.as_u64()), Some(5433));
1239            assert_eq!(props.get("database").and_then(|v| v.as_str()), Some("mydb"));
1240            assert_eq!(
1241                props.get("password").and_then(|v| v.as_str()),
1242                Some("secret123")
1243            );
1244        }
1245    }
1246
1247    mod builder {
1248        use super::*;
1249
1250        #[test]
1251        fn test_postgres_builder_defaults() {
1252            let source = PostgresSourceBuilder::new("test").build().unwrap();
1253            assert_eq!(source.config.host, "localhost");
1254            assert_eq!(source.config.port, 5432);
1255            assert_eq!(source.config.slot_name, "drasi_slot");
1256            assert_eq!(source.config.publication_name, "drasi_publication");
1257        }
1258
1259        #[test]
1260        fn test_postgres_builder_custom_values() {
1261            let source = PostgresSourceBuilder::new("test")
1262                .with_host("db.example.com")
1263                .with_port(5433)
1264                .with_database("production")
1265                .with_user("app_user")
1266                .with_password("secret")
1267                .with_tables(vec!["users".to_string(), "orders".to_string()])
1268                .build()
1269                .unwrap();
1270
1271            assert_eq!(source.config.host, "db.example.com");
1272            assert_eq!(source.config.port, 5433);
1273            assert_eq!(source.config.database, "production");
1274            assert_eq!(source.config.user, "app_user");
1275            assert_eq!(source.config.password, "secret");
1276            assert_eq!(source.config.tables.len(), 2);
1277            assert_eq!(source.config.tables[0], "users");
1278            assert_eq!(source.config.tables[1], "orders");
1279        }
1280
1281        #[test]
1282        fn test_builder_add_table() {
1283            let source = PostgresSourceBuilder::new("test")
1284                .add_table("table1")
1285                .add_table("table2")
1286                .add_table("table3")
1287                .build()
1288                .unwrap();
1289
1290            assert_eq!(source.config.tables.len(), 3);
1291            assert_eq!(source.config.tables[0], "table1");
1292            assert_eq!(source.config.tables[1], "table2");
1293            assert_eq!(source.config.tables[2], "table3");
1294        }
1295
1296        #[test]
1297        fn test_builder_slot_and_publication() {
1298            let source = PostgresSourceBuilder::new("test")
1299                .with_slot_name("custom_slot")
1300                .with_publication_name("custom_pub")
1301                .build()
1302                .unwrap();
1303
1304            assert_eq!(source.config.slot_name, "custom_slot");
1305            assert_eq!(source.config.publication_name, "custom_pub");
1306        }
1307
1308        #[test]
1309        fn test_builder_id() {
1310            let source = PostgresReplicationSource::builder("my-pg-source")
1311                .with_database("db")
1312                .with_user("user")
1313                .build()
1314                .unwrap();
1315
1316            assert_eq!(source.base.id, "my-pg-source");
1317        }
1318    }
1319
1320    mod config {
1321        use super::*;
1322
1323        #[test]
1324        fn test_config_serialization() {
1325            let config = PostgresSourceConfig {
1326                host: "localhost".to_string(),
1327                port: 5432,
1328                database: "testdb".to_string(),
1329                user: "testuser".to_string(),
1330                password: String::new(),
1331                tables: Vec::new(),
1332                slot_name: "drasi_slot".to_string(),
1333                publication_name: "drasi_publication".to_string(),
1334                ssl_mode: SslMode::default(),
1335                table_keys: Vec::new(),
1336            };
1337
1338            let json = serde_json::to_string(&config).unwrap();
1339            let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
1340
1341            assert_eq!(config, deserialized);
1342        }
1343
1344        #[test]
1345        fn test_config_deserialization_with_required_fields() {
1346            let json = r#"{
1347                "database": "mydb",
1348                "user": "myuser"
1349            }"#;
1350            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1351
1352            assert_eq!(config.database, "mydb");
1353            assert_eq!(config.user, "myuser");
1354            assert_eq!(config.host, "localhost"); // default
1355            assert_eq!(config.port, 5432); // default
1356            assert_eq!(config.slot_name, "drasi_slot"); // default
1357        }
1358
1359        #[test]
1360        fn test_config_deserialization_full() {
1361            let json = r#"{
1362                "host": "db.prod.internal",
1363                "port": 5433,
1364                "database": "production",
1365                "user": "replication_user",
1366                "password": "secret",
1367                "tables": ["accounts", "transactions"],
1368                "slot_name": "prod_slot",
1369                "publication_name": "prod_publication"
1370            }"#;
1371            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1372
1373            assert_eq!(config.host, "db.prod.internal");
1374            assert_eq!(config.port, 5433);
1375            assert_eq!(config.database, "production");
1376            assert_eq!(config.user, "replication_user");
1377            assert_eq!(config.password, "secret");
1378            assert_eq!(config.tables, vec!["accounts", "transactions"]);
1379            assert_eq!(config.slot_name, "prod_slot");
1380            assert_eq!(config.publication_name, "prod_publication");
1381        }
1382    }
1383}
1384
1385/// Dynamic plugin entry point.
1386///
1387/// Dynamic plugin entry point.
1388#[cfg(feature = "dynamic-plugin")]
1389drasi_plugin_sdk::export_plugin!(
1390    plugin_id = "postgres-source",
1391    core_version = env!("CARGO_PKG_VERSION"),
1392    lib_version = env!("CARGO_PKG_VERSION"),
1393    plugin_version = env!("CARGO_PKG_VERSION"),
1394    source_descriptors = [descriptor::PostgresSourceDescriptor],
1395    reaction_descriptors = [],
1396    bootstrap_descriptors = [],
1397);