1#![allow(unexpected_cfgs)]
2pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::Result;
183use async_trait::async_trait;
184use log::{error, info};
185use std::collections::HashMap;
186use std::sync::Arc;
187use tokio::sync::RwLock;
188
189use drasi_lib::channels::{DispatchMode, *};
190use drasi_lib::component_graph::ComponentStatusHandle;
191use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
192use drasi_lib::Source;
193use tracing::Instrument;
194
195pub struct PostgresReplicationSource {
205 base: SourceBase,
207 config: PostgresSourceConfig,
209}
210
211impl PostgresReplicationSource {
212 pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
229 PostgresSourceBuilder::new(id)
230 }
231
232 pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
264 let id = id.into();
265 let params = SourceBaseParams::new(id);
266 Ok(Self {
267 base: SourceBase::new(params)?,
268 config,
269 })
270 }
271
272 pub fn with_dispatch(
277 id: impl Into<String>,
278 config: PostgresSourceConfig,
279 dispatch_mode: Option<DispatchMode>,
280 dispatch_buffer_capacity: Option<usize>,
281 ) -> Result<Self> {
282 let id = id.into();
283 let mut params = SourceBaseParams::new(id);
284 if let Some(mode) = dispatch_mode {
285 params = params.with_dispatch_mode(mode);
286 }
287 if let Some(capacity) = dispatch_buffer_capacity {
288 params = params.with_dispatch_buffer_capacity(capacity);
289 }
290 Ok(Self {
291 base: SourceBase::new(params)?,
292 config,
293 })
294 }
295}
296
297#[async_trait]
298impl Source for PostgresReplicationSource {
299 fn id(&self) -> &str {
300 &self.base.id
301 }
302
303 fn type_name(&self) -> &str {
304 "postgres"
305 }
306
307 fn properties(&self) -> HashMap<String, serde_json::Value> {
308 match serde_json::to_value(&self.config) {
309 Ok(serde_json::Value::Object(mut map)) => {
310 map.remove("password");
311 map.into_iter().collect()
312 }
313 _ => HashMap::new(),
314 }
315 }
316
317 fn auto_start(&self) -> bool {
318 self.base.get_auto_start()
319 }
320
321 async fn start(&self) -> Result<()> {
322 if self.base.get_status().await == ComponentStatus::Running {
323 return Ok(());
324 }
325
326 self.base.set_status(ComponentStatus::Starting, None).await;
327 info!("Starting PostgreSQL replication source: {}", self.base.id);
328
329 let config = self.config.clone();
330 let source_id = self.base.id.clone();
331 let dispatchers = self.base.dispatchers.clone();
332 let reporter = self.base.status_handle();
333
334 let instance_id = self
336 .base
337 .context()
338 .await
339 .map(|c| c.instance_id)
340 .unwrap_or_default();
341
342 let source_id_for_span = source_id.clone();
344 let span = tracing::info_span!(
345 "postgres_replication_task",
346 instance_id = %instance_id,
347 component_id = %source_id_for_span,
348 component_type = "source"
349 );
350
351 let task = tokio::spawn(
352 async move {
353 if let Err(e) =
354 run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
355 {
356 error!("Replication task failed for {source_id}: {e}");
357 reporter
358 .set_status(
359 ComponentStatus::Error,
360 Some(format!("Replication failed: {e}")),
361 )
362 .await;
363 }
364 }
365 .instrument(span),
366 );
367
368 *self.base.task_handle.write().await = Some(task);
369 self.base
370 .set_status(
371 ComponentStatus::Running,
372 Some("PostgreSQL replication started".to_string()),
373 )
374 .await;
375
376 Ok(())
377 }
378
379 async fn stop(&self) -> Result<()> {
380 if self.base.get_status().await != ComponentStatus::Running {
381 return Ok(());
382 }
383
384 info!("Stopping PostgreSQL replication source: {}", self.base.id);
385
386 self.base.set_status(ComponentStatus::Stopping, None).await;
387
388 if let Some(task) = self.base.task_handle.write().await.take() {
390 task.abort();
391 }
392
393 self.base
394 .set_status(
395 ComponentStatus::Stopped,
396 Some("PostgreSQL replication stopped".to_string()),
397 )
398 .await;
399
400 Ok(())
401 }
402
403 async fn status(&self) -> ComponentStatus {
404 self.base.get_status().await
405 }
406
407 async fn subscribe(
408 &self,
409 settings: drasi_lib::config::SourceSubscriptionSettings,
410 ) -> Result<SubscriptionResponse> {
411 self.base
412 .subscribe_with_bootstrap(&settings, "PostgreSQL")
413 .await
414 }
415
416 fn as_any(&self) -> &dyn std::any::Any {
417 self
418 }
419
420 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
421 self.base.initialize(context).await;
422 }
423
424 async fn set_bootstrap_provider(
425 &self,
426 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
427 ) {
428 self.base.set_bootstrap_provider(provider).await;
429 }
430}
431
432async fn run_replication(
433 source_id: String,
434 config: PostgresSourceConfig,
435 dispatchers: Arc<
436 RwLock<
437 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
438 >,
439 >,
440 status_handle: ComponentStatusHandle,
441) -> Result<()> {
442 info!("Starting replication for source {source_id}");
443
444 let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
445
446 stream.run().await
447}
448
449pub struct PostgresSourceBuilder {
469 id: String,
470 host: String,
471 port: u16,
472 database: String,
473 user: String,
474 password: String,
475 tables: Vec<String>,
476 slot_name: String,
477 publication_name: String,
478 ssl_mode: SslMode,
479 table_keys: Vec<TableKeyConfig>,
480 dispatch_mode: Option<DispatchMode>,
481 dispatch_buffer_capacity: Option<usize>,
482 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
483 auto_start: bool,
484}
485
486impl PostgresSourceBuilder {
487 pub fn new(id: impl Into<String>) -> Self {
489 Self {
490 id: id.into(),
491 host: "localhost".to_string(),
492 port: 5432,
493 database: String::new(),
494 user: String::new(),
495 password: String::new(),
496 tables: Vec::new(),
497 slot_name: "drasi_slot".to_string(),
498 publication_name: "drasi_publication".to_string(),
499 ssl_mode: SslMode::default(),
500 table_keys: Vec::new(),
501 dispatch_mode: None,
502 dispatch_buffer_capacity: None,
503 bootstrap_provider: None,
504 auto_start: true,
505 }
506 }
507
508 pub fn with_host(mut self, host: impl Into<String>) -> Self {
510 self.host = host.into();
511 self
512 }
513
514 pub fn with_port(mut self, port: u16) -> Self {
516 self.port = port;
517 self
518 }
519
520 pub fn with_database(mut self, database: impl Into<String>) -> Self {
522 self.database = database.into();
523 self
524 }
525
526 pub fn with_user(mut self, user: impl Into<String>) -> Self {
528 self.user = user.into();
529 self
530 }
531
532 pub fn with_password(mut self, password: impl Into<String>) -> Self {
534 self.password = password.into();
535 self
536 }
537
538 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
540 self.tables = tables;
541 self
542 }
543
544 pub fn add_table(mut self, table: impl Into<String>) -> Self {
546 self.tables.push(table.into());
547 self
548 }
549
550 pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
552 self.slot_name = slot_name.into();
553 self
554 }
555
556 pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
558 self.publication_name = publication_name.into();
559 self
560 }
561
562 pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
564 self.ssl_mode = ssl_mode;
565 self
566 }
567
568 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
570 self.table_keys = table_keys;
571 self
572 }
573
574 pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
576 self.table_keys.push(table_key);
577 self
578 }
579
580 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
582 self.dispatch_mode = Some(mode);
583 self
584 }
585
586 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
588 self.dispatch_buffer_capacity = Some(capacity);
589 self
590 }
591
592 pub fn with_bootstrap_provider(
594 mut self,
595 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
596 ) -> Self {
597 self.bootstrap_provider = Some(Box::new(provider));
598 self
599 }
600
601 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
606 self.auto_start = auto_start;
607 self
608 }
609
610 pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
612 self.host = config.host;
613 self.port = config.port;
614 self.database = config.database;
615 self.user = config.user;
616 self.password = config.password;
617 self.tables = config.tables;
618 self.slot_name = config.slot_name;
619 self.publication_name = config.publication_name;
620 self.ssl_mode = config.ssl_mode;
621 self.table_keys = config.table_keys;
622 self
623 }
624
625 pub fn build(self) -> Result<PostgresReplicationSource> {
631 let config = PostgresSourceConfig {
632 host: self.host,
633 port: self.port,
634 database: self.database,
635 user: self.user,
636 password: self.password,
637 tables: self.tables,
638 slot_name: self.slot_name,
639 publication_name: self.publication_name,
640 ssl_mode: self.ssl_mode,
641 table_keys: self.table_keys,
642 };
643
644 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
645 if let Some(mode) = self.dispatch_mode {
646 params = params.with_dispatch_mode(mode);
647 }
648 if let Some(capacity) = self.dispatch_buffer_capacity {
649 params = params.with_dispatch_buffer_capacity(capacity);
650 }
651 if let Some(provider) = self.bootstrap_provider {
652 params = params.with_bootstrap_provider(provider);
653 }
654
655 Ok(PostgresReplicationSource {
656 base: SourceBase::new(params)?,
657 config,
658 })
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 mod construction {
667 use super::*;
668
669 #[test]
670 fn test_builder_with_valid_config() {
671 let source = PostgresSourceBuilder::new("test-source")
672 .with_database("testdb")
673 .with_user("testuser")
674 .build();
675 assert!(source.is_ok());
676 }
677
678 #[test]
679 fn test_builder_with_custom_config() {
680 let source = PostgresSourceBuilder::new("pg-source")
681 .with_host("192.168.1.100")
682 .with_port(5433)
683 .with_database("production")
684 .with_user("admin")
685 .with_password("secret")
686 .build()
687 .unwrap();
688 assert_eq!(source.id(), "pg-source");
689 }
690
691 #[test]
692 fn test_with_dispatch_creates_source() {
693 let config = PostgresSourceConfig {
694 host: "localhost".to_string(),
695 port: 5432,
696 database: "testdb".to_string(),
697 user: "testuser".to_string(),
698 password: String::new(),
699 tables: Vec::new(),
700 slot_name: "drasi_slot".to_string(),
701 publication_name: "drasi_publication".to_string(),
702 ssl_mode: SslMode::default(),
703 table_keys: Vec::new(),
704 };
705 let source = PostgresReplicationSource::with_dispatch(
706 "dispatch-source",
707 config,
708 Some(DispatchMode::Channel),
709 Some(2000),
710 );
711 assert!(source.is_ok());
712 assert_eq!(source.unwrap().id(), "dispatch-source");
713 }
714 }
715
716 mod properties {
717 use super::*;
718
719 #[test]
720 fn test_id_returns_correct_value() {
721 let source = PostgresSourceBuilder::new("my-pg-source")
722 .with_database("db")
723 .with_user("user")
724 .build()
725 .unwrap();
726 assert_eq!(source.id(), "my-pg-source");
727 }
728
729 #[test]
730 fn test_type_name_returns_postgres() {
731 let source = PostgresSourceBuilder::new("test")
732 .with_database("db")
733 .with_user("user")
734 .build()
735 .unwrap();
736 assert_eq!(source.type_name(), "postgres");
737 }
738
739 #[test]
740 fn test_properties_contains_connection_info() {
741 let source = PostgresSourceBuilder::new("test")
742 .with_host("db.example.com")
743 .with_port(5433)
744 .with_database("mydb")
745 .with_user("app_user")
746 .with_password("secret")
747 .with_tables(vec!["users".to_string()])
748 .build()
749 .unwrap();
750 let props = source.properties();
751
752 assert_eq!(
753 props.get("host"),
754 Some(&serde_json::Value::String("db.example.com".to_string()))
755 );
756 assert_eq!(
757 props.get("port"),
758 Some(&serde_json::Value::Number(5433.into()))
759 );
760 assert_eq!(
761 props.get("database"),
762 Some(&serde_json::Value::String("mydb".to_string()))
763 );
764 assert_eq!(
765 props.get("user"),
766 Some(&serde_json::Value::String("app_user".to_string()))
767 );
768 }
769
770 #[test]
771 fn test_properties_does_not_expose_password() {
772 let source = PostgresSourceBuilder::new("test")
773 .with_database("db")
774 .with_user("user")
775 .with_password("super_secret_password")
776 .build()
777 .unwrap();
778 let props = source.properties();
779
780 assert!(!props.contains_key("password"));
782 }
783
784 #[test]
785 fn test_properties_includes_tables() {
786 let source = PostgresSourceBuilder::new("test")
787 .with_database("db")
788 .with_user("user")
789 .with_tables(vec!["users".to_string(), "orders".to_string()])
790 .build()
791 .unwrap();
792 let props = source.properties();
793
794 let tables = props.get("tables").unwrap().as_array().unwrap();
795 assert_eq!(tables.len(), 2);
796 assert_eq!(tables[0], "users");
797 assert_eq!(tables[1], "orders");
798 }
799 }
800
801 mod lifecycle {
802 use super::*;
803
804 #[tokio::test]
805 async fn test_initial_status_is_stopped() {
806 let source = PostgresSourceBuilder::new("test")
807 .with_database("db")
808 .with_user("user")
809 .build()
810 .unwrap();
811 assert_eq!(source.status().await, ComponentStatus::Stopped);
812 }
813 }
814
815 mod builder {
816 use super::*;
817
818 #[test]
819 fn test_postgres_builder_defaults() {
820 let source = PostgresSourceBuilder::new("test").build().unwrap();
821 assert_eq!(source.config.host, "localhost");
822 assert_eq!(source.config.port, 5432);
823 assert_eq!(source.config.slot_name, "drasi_slot");
824 assert_eq!(source.config.publication_name, "drasi_publication");
825 }
826
827 #[test]
828 fn test_postgres_builder_custom_values() {
829 let source = PostgresSourceBuilder::new("test")
830 .with_host("db.example.com")
831 .with_port(5433)
832 .with_database("production")
833 .with_user("app_user")
834 .with_password("secret")
835 .with_tables(vec!["users".to_string(), "orders".to_string()])
836 .build()
837 .unwrap();
838
839 assert_eq!(source.config.host, "db.example.com");
840 assert_eq!(source.config.port, 5433);
841 assert_eq!(source.config.database, "production");
842 assert_eq!(source.config.user, "app_user");
843 assert_eq!(source.config.password, "secret");
844 assert_eq!(source.config.tables.len(), 2);
845 assert_eq!(source.config.tables[0], "users");
846 assert_eq!(source.config.tables[1], "orders");
847 }
848
849 #[test]
850 fn test_builder_add_table() {
851 let source = PostgresSourceBuilder::new("test")
852 .add_table("table1")
853 .add_table("table2")
854 .add_table("table3")
855 .build()
856 .unwrap();
857
858 assert_eq!(source.config.tables.len(), 3);
859 assert_eq!(source.config.tables[0], "table1");
860 assert_eq!(source.config.tables[1], "table2");
861 assert_eq!(source.config.tables[2], "table3");
862 }
863
864 #[test]
865 fn test_builder_slot_and_publication() {
866 let source = PostgresSourceBuilder::new("test")
867 .with_slot_name("custom_slot")
868 .with_publication_name("custom_pub")
869 .build()
870 .unwrap();
871
872 assert_eq!(source.config.slot_name, "custom_slot");
873 assert_eq!(source.config.publication_name, "custom_pub");
874 }
875
876 #[test]
877 fn test_builder_id() {
878 let source = PostgresReplicationSource::builder("my-pg-source")
879 .with_database("db")
880 .with_user("user")
881 .build()
882 .unwrap();
883
884 assert_eq!(source.base.id, "my-pg-source");
885 }
886 }
887
888 mod config {
889 use super::*;
890
891 #[test]
892 fn test_config_serialization() {
893 let config = PostgresSourceConfig {
894 host: "localhost".to_string(),
895 port: 5432,
896 database: "testdb".to_string(),
897 user: "testuser".to_string(),
898 password: String::new(),
899 tables: Vec::new(),
900 slot_name: "drasi_slot".to_string(),
901 publication_name: "drasi_publication".to_string(),
902 ssl_mode: SslMode::default(),
903 table_keys: Vec::new(),
904 };
905
906 let json = serde_json::to_string(&config).unwrap();
907 let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
908
909 assert_eq!(config, deserialized);
910 }
911
912 #[test]
913 fn test_config_deserialization_with_required_fields() {
914 let json = r#"{
915 "database": "mydb",
916 "user": "myuser"
917 }"#;
918 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
919
920 assert_eq!(config.database, "mydb");
921 assert_eq!(config.user, "myuser");
922 assert_eq!(config.host, "localhost"); assert_eq!(config.port, 5432); assert_eq!(config.slot_name, "drasi_slot"); }
926
927 #[test]
928 fn test_config_deserialization_full() {
929 let json = r#"{
930 "host": "db.prod.internal",
931 "port": 5433,
932 "database": "production",
933 "user": "replication_user",
934 "password": "secret",
935 "tables": ["accounts", "transactions"],
936 "slot_name": "prod_slot",
937 "publication_name": "prod_publication"
938 }"#;
939 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
940
941 assert_eq!(config.host, "db.prod.internal");
942 assert_eq!(config.port, 5433);
943 assert_eq!(config.database, "production");
944 assert_eq!(config.user, "replication_user");
945 assert_eq!(config.password, "secret");
946 assert_eq!(config.tables, vec!["accounts", "transactions"]);
947 assert_eq!(config.slot_name, "prod_slot");
948 assert_eq!(config.publication_name, "prod_publication");
949 }
950 }
951}
952
953#[cfg(feature = "dynamic-plugin")]
957drasi_plugin_sdk::export_plugin!(
958 plugin_id = "postgres-source",
959 core_version = env!("CARGO_PKG_VERSION"),
960 lib_version = env!("CARGO_PKG_VERSION"),
961 plugin_version = env!("CARGO_PKG_VERSION"),
962 source_descriptors = [descriptor::PostgresSourceDescriptor],
963 reaction_descriptors = [],
964 bootstrap_descriptors = [],
965);