Skip to main content

drasi_source_postgres/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! PostgreSQL Replication Source Plugin for Drasi
17//!
18//! This plugin captures data changes from PostgreSQL databases using logical replication.
19//! It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL)
20//! messages in real-time, converting them to Drasi source change events.
21//!
22//! # Prerequisites
23//!
24//! Before using this source, you must configure PostgreSQL for logical replication:
25//!
26//! 1. **Enable logical replication** in `postgresql.conf`:
27//!    ```text
28//!    wal_level = logical
29//!    max_replication_slots = 10
30//!    max_wal_senders = 10
31//!    ```
32//!
33//! 2. **Create a publication** for the tables you want to monitor:
34//!    ```sql
35//!    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
36//!    ```
37//!
38//! 3. **Create a replication slot** (optional - the source can create one automatically):
39//!    ```sql
40//!    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
41//!    ```
42//!
43//! 4. **Grant replication permissions** to the database user:
44//!    ```sql
45//!    ALTER ROLE drasi_user REPLICATION;
46//!    GRANT SELECT ON TABLE users, orders TO drasi_user;
47//!    ```
48//!
49//! # Architecture
50//!
51//! The source has two main components:
52//!
53//! - **Bootstrap Handler**: Performs an initial snapshot of table data when a query
54//!   subscribes with bootstrap enabled. Uses the replication slot's snapshot LSN to
55//!   ensure consistency.
56//!
57//! - **Streaming Handler**: Continuously reads WAL messages and decodes them using
58//!   the `pgoutput` protocol. Handles INSERT, UPDATE, and DELETE operations.
59//!
60//! # Configuration
61//!
62//! | Field | Type | Default | Description |
63//! |-------|------|---------|-------------|
64//! | `host` | string | `"localhost"` | PostgreSQL host |
65//! | `port` | u16 | `5432` | PostgreSQL port |
66//! | `database` | string | *required* | Database name |
67//! | `user` | string | *required* | Database user (must have replication permission) |
68//! | `password` | string | `""` | Database password |
69//! | `tables` | string[] | `[]` | Tables to replicate |
70//! | `slot_name` | string | `"drasi_slot"` | Replication slot name |
71//! | `publication_name` | string | `"drasi_publication"` | Publication name |
72//! | `ssl_mode` | string | `"prefer"` | SSL mode: disable, prefer, require |
73//! | `table_keys` | TableKeyConfig[] | `[]` | Primary key configuration for tables |
74//!
75//! # Example Configuration (YAML)
76//!
77//! ```yaml
78//! source_type: postgres
79//! properties:
80//!   host: db.example.com
81//!   port: 5432
82//!   database: production
83//!   user: replication_user
84//!   password: secret
85//!   tables:
86//!     - users
87//!     - orders
88//!   slot_name: drasi_slot
89//!   publication_name: drasi_publication
90//!   table_keys:
91//!     - table: users
92//!       key_columns: [id]
93//!     - table: orders
94//!       key_columns: [order_id]
95//! ```
96//!
97//! # Data Format
98//!
99//! The PostgreSQL source decodes WAL messages and converts them to Drasi source changes.
100//! Each row change is mapped as follows:
101//!
102//! ## Node Mapping
103//!
104//! - **Element ID**: `{schema}:{table}:{primary_key_value}` (e.g., `public:users:123`)
105//! - **Labels**: `[{table_name}]` (e.g., `["users"]`)
106//! - **Properties**: All columns from the row (column names become property keys)
107//!
108//! ## WAL Message to SourceChange
109//!
110//! | WAL Operation | SourceChange |
111//! |---------------|--------------|
112//! | INSERT | `SourceChange::Insert { element: Node }` |
113//! | UPDATE | `SourceChange::Update { element: Node }` |
114//! | DELETE | `SourceChange::Delete { metadata }` |
115//!
116//! ## Example Mapping
117//!
118//! Given a PostgreSQL table:
119//!
120//! ```sql
121//! CREATE TABLE users (
122//!     id SERIAL PRIMARY KEY,
123//!     name VARCHAR(100),
124//!     email VARCHAR(255),
125//!     age INTEGER
126//! );
127//!
128//! INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
129//! ```
130//!
131//! Produces a SourceChange equivalent to:
132//!
133//! ```json
134//! {
135//!     "type": "Insert",
136//!     "element": {
137//!         "metadata": {
138//!             "element_id": "public:users:1",
139//!             "source_id": "pg-source",
140//!             "labels": ["users"],
141//!             "effective_from": 1699900000000000
142//!         },
143//!         "properties": {
144//!             "id": 1,
145//!             "name": "Alice",
146//!             "email": "alice@example.com",
147//!             "age": 30
148//!         }
149//!     }
150//! }
151//! ```
152//!
153//! # Usage Example
154//!
155//! ```rust,ignore
156//! use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
157//! use std::sync::Arc;
158//!
159//! let config = PostgresSourceBuilder::new()
160//!     .with_host("db.example.com")
161//!     .with_database("production")
162//!     .with_user("replication_user")
163//!     .with_password("secret")
164//!     .with_tables(vec!["users".to_string(), "orders".to_string()])
165//!     .build();
166//!
167//! let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
168//! drasi.add_source(source).await?;
169//! ```
170
171pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::{anyhow, Result};
183use async_trait::async_trait;
184use drasi_lib::schema::{
185    normalize_table_label, NodeSchema, PropertySchema, PropertyType, SourceSchema,
186};
187use log::{debug, error, info};
188use postgres_native_tls::MakeTlsConnector;
189use std::collections::HashMap;
190use std::sync::Arc;
191use tokio::sync::RwLock;
192
193use drasi_lib::channels::{DispatchMode, *};
194use drasi_lib::component_graph::ComponentStatusHandle;
195use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
196use drasi_lib::Source;
197use tracing::Instrument;
198
199/// PostgreSQL replication source that captures changes via logical replication.
200///
201/// This source connects to PostgreSQL using the replication protocol and decodes
202/// WAL messages in real-time, converting them to Drasi source change events.
203///
204/// # Fields
205///
206/// - `base`: Common source functionality (dispatchers, status, lifecycle)
207/// - `config`: PostgreSQL connection and replication configuration
208pub struct PostgresReplicationSource {
209    /// Base source implementation providing common functionality
210    base: SourceBase,
211    /// PostgreSQL source configuration
212    config: PostgresSourceConfig,
213    /// Best-effort cached schema populated from information_schema on start.
214    cached_schema: Arc<std::sync::RwLock<Option<SourceSchema>>>,
215}
216
217fn postgres_type_to_property_type(data_type: &str) -> Option<PropertyType> {
218    match data_type {
219        "smallint" | "integer" | "bigint" => Some(PropertyType::Integer),
220        "real" | "double precision" | "numeric" | "decimal" => Some(PropertyType::Float),
221        "boolean" => Some(PropertyType::Boolean),
222        "timestamp without time zone"
223        | "timestamp with time zone"
224        | "date"
225        | "time without time zone"
226        | "time with time zone" => Some(PropertyType::Timestamp),
227        "json" | "jsonb" => Some(PropertyType::Json),
228        "character" | "character varying" | "text" | "uuid" | "bytea" => Some(PropertyType::String),
229        _ => None,
230    }
231}
232
233async fn introspect_postgres_schema(config: &PostgresSourceConfig) -> Result<Option<SourceSchema>> {
234    if config.tables.is_empty() {
235        return Ok(None);
236    }
237
238    let mut pg_config = tokio_postgres::Config::new();
239    pg_config.host(&config.host);
240    pg_config.port(config.port);
241    pg_config.dbname(&config.database);
242    pg_config.user(&config.user);
243    if !config.password.is_empty() {
244        pg_config.password(&config.password);
245    }
246
247    let client = match config.ssl_mode {
248        SslMode::Require => {
249            pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
250            let tls_connector = native_tls::TlsConnector::builder()
251                .danger_accept_invalid_hostnames(false)
252                .danger_accept_invalid_certs(false)
253                .build()
254                .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
255            let connector = MakeTlsConnector::new(tls_connector);
256
257            debug!("Schema introspection: connecting with SSL (require)");
258            let (client, connection) = pg_config.connect(connector).await?;
259            tokio::spawn(async move {
260                if let Err(e) = connection.await {
261                    log::warn!("PostgreSQL schema introspection connection closed: {e}");
262                }
263            });
264            client
265        }
266        SslMode::Prefer => {
267            // Try TLS first, fall back to plaintext
268            let tls_connector = native_tls::TlsConnector::builder()
269                .danger_accept_invalid_hostnames(false)
270                .danger_accept_invalid_certs(false)
271                .build()
272                .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
273            let connector = MakeTlsConnector::new(tls_connector);
274
275            pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer);
276            debug!("Schema introspection: connecting with SSL (prefer)");
277            let (client, connection) = pg_config.connect(connector).await?;
278            tokio::spawn(async move {
279                if let Err(e) = connection.await {
280                    log::warn!("PostgreSQL schema introspection connection closed: {e}");
281                }
282            });
283            client
284        }
285        SslMode::Disable => {
286            debug!("Schema introspection: connecting without SSL");
287            let (client, connection) = pg_config.connect(tokio_postgres::NoTls).await?;
288            tokio::spawn(async move {
289                if let Err(e) = connection.await {
290                    log::warn!("PostgreSQL schema introspection connection closed: {e}");
291                }
292            });
293            client
294        }
295    };
296
297    let mut nodes = Vec::new();
298
299    for table in &config.tables {
300        let (schema_name, table_name) = table
301            .split_once('.')
302            .map(|(schema, name)| (schema.to_string(), name.to_string()))
303            .unwrap_or_else(|| ("public".to_string(), table.to_string()));
304
305        let rows = client
306            .query(
307                "SELECT column_name, data_type \
308                 FROM information_schema.columns \
309                 WHERE table_schema = $1 AND table_name = $2 \
310                 ORDER BY ordinal_position",
311                &[&schema_name, &table_name],
312            )
313            .await?;
314
315        let properties = rows
316            .into_iter()
317            .map(|row| PropertySchema {
318                name: row.get::<_, String>(0),
319                data_type: postgres_type_to_property_type(&row.get::<_, String>(1)),
320                description: None,
321            })
322            .collect();
323
324        nodes.push(NodeSchema {
325            label: normalize_table_label(&table_name),
326            properties,
327        });
328    }
329
330    Ok(Some(SourceSchema {
331        nodes,
332        relations: Vec::new(),
333    }))
334}
335
336impl PostgresReplicationSource {
337    /// Create a builder for PostgresReplicationSource
338    ///
339    /// # Example
340    ///
341    /// ```rust,ignore
342    /// use drasi_source_postgres::PostgresReplicationSource;
343    ///
344    /// let source = PostgresReplicationSource::builder("pg-source")
345    ///     .with_host("db.example.com")
346    ///     .with_database("production")
347    ///     .with_user("replication_user")
348    ///     .with_password("secret")
349    ///     .with_tables(vec!["users".to_string(), "orders".to_string()])
350    ///     .with_bootstrap_provider(my_provider)
351    ///     .build()?;
352    /// ```
353    pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
354        PostgresSourceBuilder::new(id)
355    }
356
357    /// Create a new PostgreSQL replication source.
358    ///
359    /// The event channel is automatically injected when the source is added
360    /// to DrasiLib via `add_source()`.
361    ///
362    /// # Arguments
363    ///
364    /// * `id` - Unique identifier for this source instance
365    /// * `config` - PostgreSQL source configuration
366    ///
367    /// # Returns
368    ///
369    /// A new `PostgresReplicationSource` instance, or an error if construction fails.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the base source cannot be initialized.
374    ///
375    /// # Example
376    ///
377    /// ```rust,ignore
378    /// use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
379    ///
380    /// let config = PostgresSourceBuilder::new()
381    ///     .with_host("db.example.com")
382    ///     .with_database("mydb")
383    ///     .with_user("replication_user")
384    ///     .build();
385    ///
386    /// let source = PostgresReplicationSource::new("my-pg-source", config)?;
387    /// ```
388    pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
389        let id = id.into();
390        let params = SourceBaseParams::new(id);
391        Ok(Self {
392            base: SourceBase::new(params)?,
393            config,
394            cached_schema: Arc::new(std::sync::RwLock::new(None)),
395        })
396    }
397
398    /// Create a new PostgreSQL replication source with custom dispatch settings
399    ///
400    /// The event channel is automatically injected when the source is added
401    /// to DrasiLib via `add_source()`.
402    pub fn with_dispatch(
403        id: impl Into<String>,
404        config: PostgresSourceConfig,
405        dispatch_mode: Option<DispatchMode>,
406        dispatch_buffer_capacity: Option<usize>,
407    ) -> Result<Self> {
408        let id = id.into();
409        let mut params = SourceBaseParams::new(id);
410        if let Some(mode) = dispatch_mode {
411            params = params.with_dispatch_mode(mode);
412        }
413        if let Some(capacity) = dispatch_buffer_capacity {
414            params = params.with_dispatch_buffer_capacity(capacity);
415        }
416        Ok(Self {
417            base: SourceBase::new(params)?,
418            config,
419            cached_schema: Arc::new(std::sync::RwLock::new(None)),
420        })
421    }
422}
423
424#[async_trait]
425impl Source for PostgresReplicationSource {
426    fn id(&self) -> &str {
427        &self.base.id
428    }
429
430    fn type_name(&self) -> &str {
431        "postgres"
432    }
433
434    fn properties(&self) -> HashMap<String, serde_json::Value> {
435        use crate::descriptor::PostgresSourceConfigDto;
436
437        self.base
438            .properties_or_serialize(&PostgresSourceConfigDto::from(&self.config))
439    }
440
441    fn auto_start(&self) -> bool {
442        self.base.get_auto_start()
443    }
444
445    fn describe_schema(&self) -> Option<SourceSchema> {
446        self.cached_schema
447            .read()
448            .ok()
449            .and_then(|schema| schema.clone())
450            .or_else(|| {
451                if self.config.tables.is_empty() {
452                    None
453                } else {
454                    Some(SourceSchema {
455                        nodes: self
456                            .config
457                            .tables
458                            .iter()
459                            .map(|table| NodeSchema::new(normalize_table_label(table)))
460                            .collect(),
461                        relations: Vec::new(),
462                    })
463                }
464            })
465    }
466
467    async fn start(&self) -> Result<()> {
468        if self.base.get_status().await == ComponentStatus::Running {
469            return Ok(());
470        }
471
472        self.base.set_status(ComponentStatus::Starting, None).await;
473        info!("Starting PostgreSQL replication source: {}", self.base.id);
474
475        match introspect_postgres_schema(&self.config).await {
476            Ok(Some(schema)) => {
477                if let Ok(mut cached) = self.cached_schema.write() {
478                    *cached = Some(schema);
479                }
480            }
481            Ok(None) => {}
482            Err(e) => {
483                log::warn!(
484                    "Failed to introspect PostgreSQL schema for '{}': {e}",
485                    self.base.id
486                );
487            }
488        }
489
490        let config = self.config.clone();
491        let source_id = self.base.id.clone();
492        let dispatchers = self.base.dispatchers.clone();
493        let reporter = self.base.status_handle();
494
495        // Get instance_id from context for log routing isolation
496        let instance_id = self
497            .base
498            .context()
499            .await
500            .map(|c| c.instance_id)
501            .unwrap_or_default();
502
503        // Create span for spawned task so log::info!, log::error! etc are routed
504        let source_id_for_span = source_id.clone();
505        let span = tracing::info_span!(
506            "postgres_replication_task",
507            instance_id = %instance_id,
508            component_id = %source_id_for_span,
509            component_type = "source"
510        );
511
512        let task = tokio::spawn(
513            async move {
514                if let Err(e) =
515                    run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
516                {
517                    error!("Replication task failed for {source_id}: {e}");
518                    reporter
519                        .set_status(
520                            ComponentStatus::Error,
521                            Some(format!("Replication failed: {e}")),
522                        )
523                        .await;
524                }
525            }
526            .instrument(span),
527        );
528
529        *self.base.task_handle.write().await = Some(task);
530        self.base
531            .set_status(
532                ComponentStatus::Running,
533                Some("PostgreSQL replication started".to_string()),
534            )
535            .await;
536
537        Ok(())
538    }
539
540    async fn stop(&self) -> Result<()> {
541        if self.base.get_status().await != ComponentStatus::Running {
542            return Ok(());
543        }
544
545        info!("Stopping PostgreSQL replication source: {}", self.base.id);
546
547        self.base.set_status(ComponentStatus::Stopping, None).await;
548
549        // Cancel the replication task
550        if let Some(task) = self.base.task_handle.write().await.take() {
551            task.abort();
552        }
553
554        // Clear cached schema so a subsequent start() re-introspects
555        if let Ok(mut cached) = self.cached_schema.write() {
556            *cached = None;
557        }
558
559        self.base
560            .set_status(
561                ComponentStatus::Stopped,
562                Some("PostgreSQL replication stopped".to_string()),
563            )
564            .await;
565
566        Ok(())
567    }
568
569    async fn status(&self) -> ComponentStatus {
570        self.base.get_status().await
571    }
572
573    async fn subscribe(
574        &self,
575        settings: drasi_lib::config::SourceSubscriptionSettings,
576    ) -> Result<SubscriptionResponse> {
577        self.base
578            .subscribe_with_bootstrap(&settings, "PostgreSQL")
579            .await
580    }
581
582    fn as_any(&self) -> &dyn std::any::Any {
583        self
584    }
585
586    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
587        self.base.initialize(context).await;
588    }
589
590    async fn set_bootstrap_provider(
591        &self,
592        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
593    ) {
594        self.base.set_bootstrap_provider(provider).await;
595    }
596}
597
598async fn run_replication(
599    source_id: String,
600    config: PostgresSourceConfig,
601    dispatchers: Arc<
602        RwLock<
603            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
604        >,
605    >,
606    status_handle: ComponentStatusHandle,
607) -> Result<()> {
608    info!("Starting replication for source {source_id}");
609
610    let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
611
612    stream.run().await
613}
614
615/// Builder for PostgreSQL source configuration.
616///
617/// Provides a fluent API for constructing PostgreSQL source configurations
618/// with sensible defaults.
619///
620/// # Example
621///
622/// ```rust,ignore
623/// use drasi_source_postgres::PostgresReplicationSource;
624///
625/// let source = PostgresReplicationSource::builder("pg-source")
626///     .with_host("db.example.com")
627///     .with_database("production")
628///     .with_user("replication_user")
629///     .with_password("secret")
630///     .with_tables(vec!["users".to_string(), "orders".to_string()])
631///     .with_slot_name("my_slot")
632///     .build()?;
633/// ```
634pub struct PostgresSourceBuilder {
635    id: String,
636    host: String,
637    port: u16,
638    database: String,
639    user: String,
640    password: String,
641    tables: Vec<String>,
642    slot_name: String,
643    publication_name: String,
644    ssl_mode: SslMode,
645    table_keys: Vec<TableKeyConfig>,
646    dispatch_mode: Option<DispatchMode>,
647    dispatch_buffer_capacity: Option<usize>,
648    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
649    auto_start: bool,
650}
651
652impl PostgresSourceBuilder {
653    /// Create a new PostgreSQL source builder with the given ID and default values
654    pub fn new(id: impl Into<String>) -> Self {
655        Self {
656            id: id.into(),
657            host: "localhost".to_string(),
658            port: 5432,
659            database: String::new(),
660            user: String::new(),
661            password: String::new(),
662            tables: Vec::new(),
663            slot_name: "drasi_slot".to_string(),
664            publication_name: "drasi_publication".to_string(),
665            ssl_mode: SslMode::default(),
666            table_keys: Vec::new(),
667            dispatch_mode: None,
668            dispatch_buffer_capacity: None,
669            bootstrap_provider: None,
670            auto_start: true,
671        }
672    }
673
674    /// Set the PostgreSQL host
675    pub fn with_host(mut self, host: impl Into<String>) -> Self {
676        self.host = host.into();
677        self
678    }
679
680    /// Set the PostgreSQL port
681    pub fn with_port(mut self, port: u16) -> Self {
682        self.port = port;
683        self
684    }
685
686    /// Set the database name
687    pub fn with_database(mut self, database: impl Into<String>) -> Self {
688        self.database = database.into();
689        self
690    }
691
692    /// Set the database user
693    pub fn with_user(mut self, user: impl Into<String>) -> Self {
694        self.user = user.into();
695        self
696    }
697
698    /// Set the database password
699    pub fn with_password(mut self, password: impl Into<String>) -> Self {
700        self.password = password.into();
701        self
702    }
703
704    /// Set the tables to replicate
705    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
706        self.tables = tables;
707        self
708    }
709
710    /// Add a table to replicate
711    pub fn add_table(mut self, table: impl Into<String>) -> Self {
712        self.tables.push(table.into());
713        self
714    }
715
716    /// Set the replication slot name
717    pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
718        self.slot_name = slot_name.into();
719        self
720    }
721
722    /// Set the publication name
723    pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
724        self.publication_name = publication_name.into();
725        self
726    }
727
728    /// Set the SSL mode
729    pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
730        self.ssl_mode = ssl_mode;
731        self
732    }
733
734    /// Set the table key configurations
735    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
736        self.table_keys = table_keys;
737        self
738    }
739
740    /// Add a table key configuration
741    pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
742        self.table_keys.push(table_key);
743        self
744    }
745
746    /// Set the dispatch mode for this source
747    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
748        self.dispatch_mode = Some(mode);
749        self
750    }
751
752    /// Set the dispatch buffer capacity for this source
753    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
754        self.dispatch_buffer_capacity = Some(capacity);
755        self
756    }
757
758    /// Set the bootstrap provider for this source
759    pub fn with_bootstrap_provider(
760        mut self,
761        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
762    ) -> Self {
763        self.bootstrap_provider = Some(Box::new(provider));
764        self
765    }
766
767    /// Set whether this source should auto-start when DrasiLib starts.
768    ///
769    /// Default is `true`. Set to `false` if this source should only be
770    /// started manually via `start_source()`.
771    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
772        self.auto_start = auto_start;
773        self
774    }
775
776    /// Set the full configuration at once
777    pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
778        self.host = config.host;
779        self.port = config.port;
780        self.database = config.database;
781        self.user = config.user;
782        self.password = config.password;
783        self.tables = config.tables;
784        self.slot_name = config.slot_name;
785        self.publication_name = config.publication_name;
786        self.ssl_mode = config.ssl_mode;
787        self.table_keys = config.table_keys;
788        self
789    }
790
791    /// Build the PostgreSQL replication source
792    ///
793    /// # Errors
794    ///
795    /// Returns an error if the source cannot be constructed.
796    pub fn build(self) -> Result<PostgresReplicationSource> {
797        let config = PostgresSourceConfig {
798            host: self.host,
799            port: self.port,
800            database: self.database,
801            user: self.user,
802            password: self.password,
803            tables: self.tables,
804            slot_name: self.slot_name,
805            publication_name: self.publication_name,
806            ssl_mode: self.ssl_mode,
807            table_keys: self.table_keys,
808        };
809
810        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
811        if let Some(mode) = self.dispatch_mode {
812            params = params.with_dispatch_mode(mode);
813        }
814        if let Some(capacity) = self.dispatch_buffer_capacity {
815            params = params.with_dispatch_buffer_capacity(capacity);
816        }
817        if let Some(provider) = self.bootstrap_provider {
818            params = params.with_bootstrap_provider(provider);
819        }
820
821        Ok(PostgresReplicationSource {
822            base: SourceBase::new(params)?,
823            config,
824            cached_schema: Arc::new(std::sync::RwLock::new(None)),
825        })
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use super::*;
832
833    mod construction {
834        use super::*;
835
836        #[test]
837        fn test_builder_with_valid_config() {
838            let source = PostgresSourceBuilder::new("test-source")
839                .with_database("testdb")
840                .with_user("testuser")
841                .build();
842            assert!(source.is_ok());
843        }
844
845        #[test]
846        fn test_builder_with_custom_config() {
847            let source = PostgresSourceBuilder::new("pg-source")
848                .with_host("192.168.1.100")
849                .with_port(5433)
850                .with_database("production")
851                .with_user("admin")
852                .with_password("secret")
853                .build()
854                .unwrap();
855            assert_eq!(source.id(), "pg-source");
856        }
857
858        #[test]
859        fn test_with_dispatch_creates_source() {
860            let config = PostgresSourceConfig {
861                host: "localhost".to_string(),
862                port: 5432,
863                database: "testdb".to_string(),
864                user: "testuser".to_string(),
865                password: String::new(),
866                tables: Vec::new(),
867                slot_name: "drasi_slot".to_string(),
868                publication_name: "drasi_publication".to_string(),
869                ssl_mode: SslMode::default(),
870                table_keys: Vec::new(),
871            };
872            let source = PostgresReplicationSource::with_dispatch(
873                "dispatch-source",
874                config,
875                Some(DispatchMode::Channel),
876                Some(2000),
877            );
878            assert!(source.is_ok());
879            assert_eq!(source.unwrap().id(), "dispatch-source");
880        }
881    }
882
883    mod properties {
884        use super::*;
885
886        #[test]
887        fn test_id_returns_correct_value() {
888            let source = PostgresSourceBuilder::new("my-pg-source")
889                .with_database("db")
890                .with_user("user")
891                .build()
892                .unwrap();
893            assert_eq!(source.id(), "my-pg-source");
894        }
895
896        #[test]
897        fn test_type_name_returns_postgres() {
898            let source = PostgresSourceBuilder::new("test")
899                .with_database("db")
900                .with_user("user")
901                .build()
902                .unwrap();
903            assert_eq!(source.type_name(), "postgres");
904        }
905
906        #[test]
907        fn test_properties_contains_connection_info() {
908            let source = PostgresSourceBuilder::new("test")
909                .with_host("db.example.com")
910                .with_port(5433)
911                .with_database("mydb")
912                .with_user("app_user")
913                .with_password("secret")
914                .with_tables(vec!["users".to_string()])
915                .build()
916                .unwrap();
917            let props = source.properties();
918
919            assert_eq!(
920                props.get("host"),
921                Some(&serde_json::Value::String("db.example.com".to_string()))
922            );
923            assert_eq!(
924                props.get("port"),
925                Some(&serde_json::Value::Number(5433.into()))
926            );
927            assert_eq!(
928                props.get("database"),
929                Some(&serde_json::Value::String("mydb".to_string()))
930            );
931            assert_eq!(
932                props.get("user"),
933                Some(&serde_json::Value::String("app_user".to_string()))
934            );
935        }
936
937        #[test]
938        fn test_properties_includes_password() {
939            let source = PostgresSourceBuilder::new("test")
940                .with_database("db")
941                .with_user("user")
942                .with_password("super_secret_password")
943                .build()
944                .unwrap();
945            let props = source.properties();
946
947            // Password must be preserved for config persistence roundtrip
948            assert_eq!(
949                props.get("password"),
950                Some(&serde_json::Value::String(
951                    "super_secret_password".to_string()
952                ))
953            );
954        }
955
956        #[test]
957        fn test_properties_includes_tables() {
958            let source = PostgresSourceBuilder::new("test")
959                .with_database("db")
960                .with_user("user")
961                .with_tables(vec!["users".to_string(), "orders".to_string()])
962                .build()
963                .unwrap();
964            let props = source.properties();
965
966            let tables = props.get("tables").unwrap().as_array().unwrap();
967            assert_eq!(tables.len(), 2);
968            assert_eq!(tables[0], "users");
969            assert_eq!(tables[1], "orders");
970        }
971
972        #[test]
973        fn test_describe_schema_falls_back_to_configured_tables() {
974            let source = PostgresSourceBuilder::new("test")
975                .with_database("db")
976                .with_user("user")
977                .with_tables(vec!["public.users".to_string(), "orders".to_string()])
978                .build()
979                .unwrap();
980
981            let schema = source
982                .describe_schema()
983                .expect("configured postgres tables should produce fallback schema");
984
985            assert_eq!(schema.nodes.len(), 2);
986            assert!(schema.nodes.iter().any(|node| node.label == "users"));
987            assert!(schema.nodes.iter().any(|node| node.label == "orders"));
988        }
989
990        #[test]
991        fn test_postgres_type_to_property_type_integer() {
992            assert_eq!(
993                postgres_type_to_property_type("integer"),
994                Some(PropertyType::Integer)
995            );
996            assert_eq!(
997                postgres_type_to_property_type("bigint"),
998                Some(PropertyType::Integer)
999            );
1000            assert_eq!(
1001                postgres_type_to_property_type("smallint"),
1002                Some(PropertyType::Integer)
1003            );
1004        }
1005
1006        #[test]
1007        fn test_postgres_type_to_property_type_float() {
1008            assert_eq!(
1009                postgres_type_to_property_type("double precision"),
1010                Some(PropertyType::Float)
1011            );
1012            assert_eq!(
1013                postgres_type_to_property_type("real"),
1014                Some(PropertyType::Float)
1015            );
1016            assert_eq!(
1017                postgres_type_to_property_type("numeric"),
1018                Some(PropertyType::Float)
1019            );
1020            assert_eq!(
1021                postgres_type_to_property_type("decimal"),
1022                Some(PropertyType::Float)
1023            );
1024        }
1025
1026        #[test]
1027        fn test_postgres_type_to_property_type_boolean() {
1028            assert_eq!(
1029                postgres_type_to_property_type("boolean"),
1030                Some(PropertyType::Boolean)
1031            );
1032        }
1033
1034        #[test]
1035        fn test_postgres_type_to_property_type_timestamp() {
1036            assert_eq!(
1037                postgres_type_to_property_type("timestamp with time zone"),
1038                Some(PropertyType::Timestamp)
1039            );
1040            assert_eq!(
1041                postgres_type_to_property_type("timestamp without time zone"),
1042                Some(PropertyType::Timestamp)
1043            );
1044            assert_eq!(
1045                postgres_type_to_property_type("date"),
1046                Some(PropertyType::Timestamp)
1047            );
1048        }
1049
1050        #[test]
1051        fn test_postgres_type_to_property_type_json() {
1052            assert_eq!(
1053                postgres_type_to_property_type("json"),
1054                Some(PropertyType::Json)
1055            );
1056            assert_eq!(
1057                postgres_type_to_property_type("jsonb"),
1058                Some(PropertyType::Json)
1059            );
1060        }
1061
1062        #[test]
1063        fn test_postgres_type_to_property_type_string() {
1064            assert_eq!(
1065                postgres_type_to_property_type("character varying"),
1066                Some(PropertyType::String)
1067            );
1068            assert_eq!(
1069                postgres_type_to_property_type("text"),
1070                Some(PropertyType::String)
1071            );
1072            assert_eq!(
1073                postgres_type_to_property_type("uuid"),
1074                Some(PropertyType::String)
1075            );
1076        }
1077
1078        #[test]
1079        fn test_postgres_type_to_property_type_unknown_returns_none() {
1080            assert_eq!(postgres_type_to_property_type("point"), None);
1081            assert_eq!(postgres_type_to_property_type("polygon"), None);
1082            assert_eq!(postgres_type_to_property_type("cidr"), None);
1083        }
1084    }
1085
1086    mod lifecycle {
1087        use super::*;
1088
1089        /// A test secret resolver that returns a fixed value for any secret name.
1090        struct TestSecretResolver;
1091
1092        impl drasi_plugin_sdk::resolver::ValueResolver for TestSecretResolver {
1093            fn resolve_to_string(
1094                &self,
1095                value: &drasi_plugin_sdk::ConfigValue<String>,
1096            ) -> Result<String, drasi_plugin_sdk::resolver::ResolverError> {
1097                match value {
1098                    drasi_plugin_sdk::ConfigValue::Secret { name } => {
1099                        Ok(format!("resolved-secret-{name}"))
1100                    }
1101                    _ => Err(drasi_plugin_sdk::resolver::ResolverError::WrongResolverType),
1102                }
1103            }
1104        }
1105
1106        fn ensure_test_secret_resolver() {
1107            let _ = drasi_plugin_sdk::resolver::register_secret_resolver(std::sync::Arc::new(
1108                TestSecretResolver,
1109            ));
1110        }
1111
1112        #[tokio::test]
1113        async fn test_descriptor_preserves_secret_envelope() {
1114            use crate::descriptor::PostgresSourceDescriptor;
1115            use drasi_lib::sources::Source;
1116            use drasi_plugin_sdk::descriptor::SourcePluginDescriptor;
1117
1118            ensure_test_secret_resolver();
1119
1120            let config_json = serde_json::json!({
1121                "host": "db.example.com",
1122                "port": 5432,
1123                "database": "mydb",
1124                "user": "app_user",
1125                "password": {
1126                    "kind": "Secret",
1127                    "name": "db-password"
1128                },
1129                "tables": ["users"],
1130                "slotName": "drasi_slot",
1131                "publicationName": "drasi_pub"
1132            });
1133
1134            let descriptor = PostgresSourceDescriptor;
1135            let source = descriptor
1136                .create_source("pg-secret-test", &config_json, true)
1137                .await
1138                .expect("descriptor should create source");
1139
1140            let props = source.properties();
1141
1142            // Password must be the Secret envelope, NOT the resolved value
1143            let password = props.get("password").expect("password must be present");
1144            assert!(
1145                password.is_object(),
1146                "password should be Secret envelope, got: {password}"
1147            );
1148            assert_eq!(
1149                password.get("kind").and_then(|v| v.as_str()),
1150                Some("Secret"),
1151                "envelope kind must be Secret"
1152            );
1153            assert_eq!(
1154                password.get("name").and_then(|v| v.as_str()),
1155                Some("db-password"),
1156                "secret name must be preserved"
1157            );
1158
1159            // Resolved value must NOT leak into persisted properties
1160            let props_str = serde_json::to_string(&props).unwrap();
1161            assert!(
1162                !props_str.contains("resolved-secret-db-password"),
1163                "resolved secret must not appear in properties"
1164            );
1165
1166            // Keys must be camelCase (from raw_config)
1167            assert!(
1168                props.contains_key("slotName"),
1169                "expected camelCase 'slotName', got keys: {:?}",
1170                props.keys().collect::<Vec<_>>()
1171            );
1172            assert!(
1173                props.contains_key("publicationName"),
1174                "expected camelCase 'publicationName'"
1175            );
1176        }
1177
1178        #[tokio::test]
1179        async fn test_initial_status_is_stopped() {
1180            let source = PostgresSourceBuilder::new("test")
1181                .with_database("db")
1182                .with_user("user")
1183                .build()
1184                .unwrap();
1185            assert_eq!(source.status().await, ComponentStatus::Stopped);
1186        }
1187
1188        #[test]
1189        fn test_builder_fallback_produces_camel_case() {
1190            use drasi_lib::sources::Source;
1191
1192            let source = PostgresSourceBuilder::new("pg-fallback")
1193                .with_host("myhost.example.com")
1194                .with_port(5433)
1195                .with_database("mydb")
1196                .with_user("admin")
1197                .with_password("secret123")
1198                .with_ssl_mode(SslMode::Require)
1199                .with_slot_name("custom_slot")
1200                .with_publication_name("custom_pub")
1201                .build()
1202                .unwrap();
1203
1204            let props = source.properties();
1205
1206            // Must use camelCase keys (DTO serialization)
1207            assert!(
1208                props.contains_key("slotName"),
1209                "expected camelCase 'slotName', got keys: {:?}",
1210                props.keys().collect::<Vec<_>>()
1211            );
1212            assert!(
1213                props.contains_key("publicationName"),
1214                "expected camelCase 'publicationName'"
1215            );
1216            assert!(
1217                props.contains_key("sslMode"),
1218                "expected camelCase 'sslMode'"
1219            );
1220
1221            // Must NOT have snake_case keys
1222            assert!(
1223                !props.contains_key("slot_name"),
1224                "should not have snake_case 'slot_name'"
1225            );
1226            assert!(
1227                !props.contains_key("publication_name"),
1228                "should not have snake_case 'publication_name'"
1229            );
1230
1231            // Values should be correct
1232            assert_eq!(
1233                props.get("host").and_then(|v| v.as_str()),
1234                Some("myhost.example.com")
1235            );
1236            assert_eq!(props.get("port").and_then(|v| v.as_u64()), Some(5433));
1237            assert_eq!(props.get("database").and_then(|v| v.as_str()), Some("mydb"));
1238            assert_eq!(
1239                props.get("password").and_then(|v| v.as_str()),
1240                Some("secret123")
1241            );
1242        }
1243    }
1244
1245    mod builder {
1246        use super::*;
1247
1248        #[test]
1249        fn test_postgres_builder_defaults() {
1250            let source = PostgresSourceBuilder::new("test").build().unwrap();
1251            assert_eq!(source.config.host, "localhost");
1252            assert_eq!(source.config.port, 5432);
1253            assert_eq!(source.config.slot_name, "drasi_slot");
1254            assert_eq!(source.config.publication_name, "drasi_publication");
1255        }
1256
1257        #[test]
1258        fn test_postgres_builder_custom_values() {
1259            let source = PostgresSourceBuilder::new("test")
1260                .with_host("db.example.com")
1261                .with_port(5433)
1262                .with_database("production")
1263                .with_user("app_user")
1264                .with_password("secret")
1265                .with_tables(vec!["users".to_string(), "orders".to_string()])
1266                .build()
1267                .unwrap();
1268
1269            assert_eq!(source.config.host, "db.example.com");
1270            assert_eq!(source.config.port, 5433);
1271            assert_eq!(source.config.database, "production");
1272            assert_eq!(source.config.user, "app_user");
1273            assert_eq!(source.config.password, "secret");
1274            assert_eq!(source.config.tables.len(), 2);
1275            assert_eq!(source.config.tables[0], "users");
1276            assert_eq!(source.config.tables[1], "orders");
1277        }
1278
1279        #[test]
1280        fn test_builder_add_table() {
1281            let source = PostgresSourceBuilder::new("test")
1282                .add_table("table1")
1283                .add_table("table2")
1284                .add_table("table3")
1285                .build()
1286                .unwrap();
1287
1288            assert_eq!(source.config.tables.len(), 3);
1289            assert_eq!(source.config.tables[0], "table1");
1290            assert_eq!(source.config.tables[1], "table2");
1291            assert_eq!(source.config.tables[2], "table3");
1292        }
1293
1294        #[test]
1295        fn test_builder_slot_and_publication() {
1296            let source = PostgresSourceBuilder::new("test")
1297                .with_slot_name("custom_slot")
1298                .with_publication_name("custom_pub")
1299                .build()
1300                .unwrap();
1301
1302            assert_eq!(source.config.slot_name, "custom_slot");
1303            assert_eq!(source.config.publication_name, "custom_pub");
1304        }
1305
1306        #[test]
1307        fn test_builder_id() {
1308            let source = PostgresReplicationSource::builder("my-pg-source")
1309                .with_database("db")
1310                .with_user("user")
1311                .build()
1312                .unwrap();
1313
1314            assert_eq!(source.base.id, "my-pg-source");
1315        }
1316    }
1317
1318    mod config {
1319        use super::*;
1320
1321        #[test]
1322        fn test_config_serialization() {
1323            let config = PostgresSourceConfig {
1324                host: "localhost".to_string(),
1325                port: 5432,
1326                database: "testdb".to_string(),
1327                user: "testuser".to_string(),
1328                password: String::new(),
1329                tables: Vec::new(),
1330                slot_name: "drasi_slot".to_string(),
1331                publication_name: "drasi_publication".to_string(),
1332                ssl_mode: SslMode::default(),
1333                table_keys: Vec::new(),
1334            };
1335
1336            let json = serde_json::to_string(&config).unwrap();
1337            let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
1338
1339            assert_eq!(config, deserialized);
1340        }
1341
1342        #[test]
1343        fn test_config_deserialization_with_required_fields() {
1344            let json = r#"{
1345                "database": "mydb",
1346                "user": "myuser"
1347            }"#;
1348            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1349
1350            assert_eq!(config.database, "mydb");
1351            assert_eq!(config.user, "myuser");
1352            assert_eq!(config.host, "localhost"); // default
1353            assert_eq!(config.port, 5432); // default
1354            assert_eq!(config.slot_name, "drasi_slot"); // default
1355        }
1356
1357        #[test]
1358        fn test_config_deserialization_full() {
1359            let json = r#"{
1360                "host": "db.prod.internal",
1361                "port": 5433,
1362                "database": "production",
1363                "user": "replication_user",
1364                "password": "secret",
1365                "tables": ["accounts", "transactions"],
1366                "slot_name": "prod_slot",
1367                "publication_name": "prod_publication"
1368            }"#;
1369            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1370
1371            assert_eq!(config.host, "db.prod.internal");
1372            assert_eq!(config.port, 5433);
1373            assert_eq!(config.database, "production");
1374            assert_eq!(config.user, "replication_user");
1375            assert_eq!(config.password, "secret");
1376            assert_eq!(config.tables, vec!["accounts", "transactions"]);
1377            assert_eq!(config.slot_name, "prod_slot");
1378            assert_eq!(config.publication_name, "prod_publication");
1379        }
1380    }
1381}
1382
1383/// Dynamic plugin entry point.
1384///
1385/// Dynamic plugin entry point.
1386#[cfg(feature = "dynamic-plugin")]
1387drasi_plugin_sdk::export_plugin!(
1388    plugin_id = "postgres-source",
1389    core_version = env!("CARGO_PKG_VERSION"),
1390    lib_version = env!("CARGO_PKG_VERSION"),
1391    plugin_version = env!("CARGO_PKG_VERSION"),
1392    source_descriptors = [descriptor::PostgresSourceDescriptor],
1393    reaction_descriptors = [],
1394    bootstrap_descriptors = [],
1395);