1#![allow(unexpected_cfgs)]
2pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::Result;
183use async_trait::async_trait;
184use log::{error, info};
185use std::collections::HashMap;
186use std::sync::Arc;
187use tokio::sync::RwLock;
188
189use drasi_lib::channels::{DispatchMode, *};
190use drasi_lib::component_graph::ComponentStatusHandle;
191use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
192use drasi_lib::Source;
193use tracing::Instrument;
194
195pub struct PostgresReplicationSource {
205 base: SourceBase,
207 config: PostgresSourceConfig,
209}
210
211impl PostgresReplicationSource {
212 pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
229 PostgresSourceBuilder::new(id)
230 }
231
232 pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
264 let id = id.into();
265 let params = SourceBaseParams::new(id);
266 Ok(Self {
267 base: SourceBase::new(params)?,
268 config,
269 })
270 }
271
272 pub fn with_dispatch(
277 id: impl Into<String>,
278 config: PostgresSourceConfig,
279 dispatch_mode: Option<DispatchMode>,
280 dispatch_buffer_capacity: Option<usize>,
281 ) -> Result<Self> {
282 let id = id.into();
283 let mut params = SourceBaseParams::new(id);
284 if let Some(mode) = dispatch_mode {
285 params = params.with_dispatch_mode(mode);
286 }
287 if let Some(capacity) = dispatch_buffer_capacity {
288 params = params.with_dispatch_buffer_capacity(capacity);
289 }
290 Ok(Self {
291 base: SourceBase::new(params)?,
292 config,
293 })
294 }
295}
296
297#[async_trait]
298impl Source for PostgresReplicationSource {
299 fn id(&self) -> &str {
300 &self.base.id
301 }
302
303 fn type_name(&self) -> &str {
304 "postgres"
305 }
306
307 fn properties(&self) -> HashMap<String, serde_json::Value> {
308 use crate::descriptor::PostgresSourceConfigDto;
309
310 self.base
311 .properties_or_serialize(&PostgresSourceConfigDto::from(&self.config))
312 }
313
314 fn auto_start(&self) -> bool {
315 self.base.get_auto_start()
316 }
317
318 async fn start(&self) -> Result<()> {
319 if self.base.get_status().await == ComponentStatus::Running {
320 return Ok(());
321 }
322
323 self.base.set_status(ComponentStatus::Starting, None).await;
324 info!("Starting PostgreSQL replication source: {}", self.base.id);
325
326 let config = self.config.clone();
327 let source_id = self.base.id.clone();
328 let dispatchers = self.base.dispatchers.clone();
329 let reporter = self.base.status_handle();
330
331 let instance_id = self
333 .base
334 .context()
335 .await
336 .map(|c| c.instance_id)
337 .unwrap_or_default();
338
339 let source_id_for_span = source_id.clone();
341 let span = tracing::info_span!(
342 "postgres_replication_task",
343 instance_id = %instance_id,
344 component_id = %source_id_for_span,
345 component_type = "source"
346 );
347
348 let task = tokio::spawn(
349 async move {
350 if let Err(e) =
351 run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
352 {
353 error!("Replication task failed for {source_id}: {e}");
354 reporter
355 .set_status(
356 ComponentStatus::Error,
357 Some(format!("Replication failed: {e}")),
358 )
359 .await;
360 }
361 }
362 .instrument(span),
363 );
364
365 *self.base.task_handle.write().await = Some(task);
366 self.base
367 .set_status(
368 ComponentStatus::Running,
369 Some("PostgreSQL replication started".to_string()),
370 )
371 .await;
372
373 Ok(())
374 }
375
376 async fn stop(&self) -> Result<()> {
377 if self.base.get_status().await != ComponentStatus::Running {
378 return Ok(());
379 }
380
381 info!("Stopping PostgreSQL replication source: {}", self.base.id);
382
383 self.base.set_status(ComponentStatus::Stopping, None).await;
384
385 if let Some(task) = self.base.task_handle.write().await.take() {
387 task.abort();
388 }
389
390 self.base
391 .set_status(
392 ComponentStatus::Stopped,
393 Some("PostgreSQL replication stopped".to_string()),
394 )
395 .await;
396
397 Ok(())
398 }
399
400 async fn status(&self) -> ComponentStatus {
401 self.base.get_status().await
402 }
403
404 async fn subscribe(
405 &self,
406 settings: drasi_lib::config::SourceSubscriptionSettings,
407 ) -> Result<SubscriptionResponse> {
408 self.base
409 .subscribe_with_bootstrap(&settings, "PostgreSQL")
410 .await
411 }
412
413 fn as_any(&self) -> &dyn std::any::Any {
414 self
415 }
416
417 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
418 self.base.initialize(context).await;
419 }
420
421 async fn set_bootstrap_provider(
422 &self,
423 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
424 ) {
425 self.base.set_bootstrap_provider(provider).await;
426 }
427}
428
429async fn run_replication(
430 source_id: String,
431 config: PostgresSourceConfig,
432 dispatchers: Arc<
433 RwLock<
434 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
435 >,
436 >,
437 status_handle: ComponentStatusHandle,
438) -> Result<()> {
439 info!("Starting replication for source {source_id}");
440
441 let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
442
443 stream.run().await
444}
445
446pub struct PostgresSourceBuilder {
466 id: String,
467 host: String,
468 port: u16,
469 database: String,
470 user: String,
471 password: String,
472 tables: Vec<String>,
473 slot_name: String,
474 publication_name: String,
475 ssl_mode: SslMode,
476 table_keys: Vec<TableKeyConfig>,
477 dispatch_mode: Option<DispatchMode>,
478 dispatch_buffer_capacity: Option<usize>,
479 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
480 auto_start: bool,
481}
482
483impl PostgresSourceBuilder {
484 pub fn new(id: impl Into<String>) -> Self {
486 Self {
487 id: id.into(),
488 host: "localhost".to_string(),
489 port: 5432,
490 database: String::new(),
491 user: String::new(),
492 password: String::new(),
493 tables: Vec::new(),
494 slot_name: "drasi_slot".to_string(),
495 publication_name: "drasi_publication".to_string(),
496 ssl_mode: SslMode::default(),
497 table_keys: Vec::new(),
498 dispatch_mode: None,
499 dispatch_buffer_capacity: None,
500 bootstrap_provider: None,
501 auto_start: true,
502 }
503 }
504
505 pub fn with_host(mut self, host: impl Into<String>) -> Self {
507 self.host = host.into();
508 self
509 }
510
511 pub fn with_port(mut self, port: u16) -> Self {
513 self.port = port;
514 self
515 }
516
517 pub fn with_database(mut self, database: impl Into<String>) -> Self {
519 self.database = database.into();
520 self
521 }
522
523 pub fn with_user(mut self, user: impl Into<String>) -> Self {
525 self.user = user.into();
526 self
527 }
528
529 pub fn with_password(mut self, password: impl Into<String>) -> Self {
531 self.password = password.into();
532 self
533 }
534
535 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
537 self.tables = tables;
538 self
539 }
540
541 pub fn add_table(mut self, table: impl Into<String>) -> Self {
543 self.tables.push(table.into());
544 self
545 }
546
547 pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
549 self.slot_name = slot_name.into();
550 self
551 }
552
553 pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
555 self.publication_name = publication_name.into();
556 self
557 }
558
559 pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
561 self.ssl_mode = ssl_mode;
562 self
563 }
564
565 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
567 self.table_keys = table_keys;
568 self
569 }
570
571 pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
573 self.table_keys.push(table_key);
574 self
575 }
576
577 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
579 self.dispatch_mode = Some(mode);
580 self
581 }
582
583 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
585 self.dispatch_buffer_capacity = Some(capacity);
586 self
587 }
588
589 pub fn with_bootstrap_provider(
591 mut self,
592 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
593 ) -> Self {
594 self.bootstrap_provider = Some(Box::new(provider));
595 self
596 }
597
598 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
603 self.auto_start = auto_start;
604 self
605 }
606
607 pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
609 self.host = config.host;
610 self.port = config.port;
611 self.database = config.database;
612 self.user = config.user;
613 self.password = config.password;
614 self.tables = config.tables;
615 self.slot_name = config.slot_name;
616 self.publication_name = config.publication_name;
617 self.ssl_mode = config.ssl_mode;
618 self.table_keys = config.table_keys;
619 self
620 }
621
622 pub fn build(self) -> Result<PostgresReplicationSource> {
628 let config = PostgresSourceConfig {
629 host: self.host,
630 port: self.port,
631 database: self.database,
632 user: self.user,
633 password: self.password,
634 tables: self.tables,
635 slot_name: self.slot_name,
636 publication_name: self.publication_name,
637 ssl_mode: self.ssl_mode,
638 table_keys: self.table_keys,
639 };
640
641 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
642 if let Some(mode) = self.dispatch_mode {
643 params = params.with_dispatch_mode(mode);
644 }
645 if let Some(capacity) = self.dispatch_buffer_capacity {
646 params = params.with_dispatch_buffer_capacity(capacity);
647 }
648 if let Some(provider) = self.bootstrap_provider {
649 params = params.with_bootstrap_provider(provider);
650 }
651
652 Ok(PostgresReplicationSource {
653 base: SourceBase::new(params)?,
654 config,
655 })
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662
663 mod construction {
664 use super::*;
665
666 #[test]
667 fn test_builder_with_valid_config() {
668 let source = PostgresSourceBuilder::new("test-source")
669 .with_database("testdb")
670 .with_user("testuser")
671 .build();
672 assert!(source.is_ok());
673 }
674
675 #[test]
676 fn test_builder_with_custom_config() {
677 let source = PostgresSourceBuilder::new("pg-source")
678 .with_host("192.168.1.100")
679 .with_port(5433)
680 .with_database("production")
681 .with_user("admin")
682 .with_password("secret")
683 .build()
684 .unwrap();
685 assert_eq!(source.id(), "pg-source");
686 }
687
688 #[test]
689 fn test_with_dispatch_creates_source() {
690 let config = PostgresSourceConfig {
691 host: "localhost".to_string(),
692 port: 5432,
693 database: "testdb".to_string(),
694 user: "testuser".to_string(),
695 password: String::new(),
696 tables: Vec::new(),
697 slot_name: "drasi_slot".to_string(),
698 publication_name: "drasi_publication".to_string(),
699 ssl_mode: SslMode::default(),
700 table_keys: Vec::new(),
701 };
702 let source = PostgresReplicationSource::with_dispatch(
703 "dispatch-source",
704 config,
705 Some(DispatchMode::Channel),
706 Some(2000),
707 );
708 assert!(source.is_ok());
709 assert_eq!(source.unwrap().id(), "dispatch-source");
710 }
711 }
712
713 mod properties {
714 use super::*;
715
716 #[test]
717 fn test_id_returns_correct_value() {
718 let source = PostgresSourceBuilder::new("my-pg-source")
719 .with_database("db")
720 .with_user("user")
721 .build()
722 .unwrap();
723 assert_eq!(source.id(), "my-pg-source");
724 }
725
726 #[test]
727 fn test_type_name_returns_postgres() {
728 let source = PostgresSourceBuilder::new("test")
729 .with_database("db")
730 .with_user("user")
731 .build()
732 .unwrap();
733 assert_eq!(source.type_name(), "postgres");
734 }
735
736 #[test]
737 fn test_properties_contains_connection_info() {
738 let source = PostgresSourceBuilder::new("test")
739 .with_host("db.example.com")
740 .with_port(5433)
741 .with_database("mydb")
742 .with_user("app_user")
743 .with_password("secret")
744 .with_tables(vec!["users".to_string()])
745 .build()
746 .unwrap();
747 let props = source.properties();
748
749 assert_eq!(
750 props.get("host"),
751 Some(&serde_json::Value::String("db.example.com".to_string()))
752 );
753 assert_eq!(
754 props.get("port"),
755 Some(&serde_json::Value::Number(5433.into()))
756 );
757 assert_eq!(
758 props.get("database"),
759 Some(&serde_json::Value::String("mydb".to_string()))
760 );
761 assert_eq!(
762 props.get("user"),
763 Some(&serde_json::Value::String("app_user".to_string()))
764 );
765 }
766
767 #[test]
768 fn test_properties_includes_password() {
769 let source = PostgresSourceBuilder::new("test")
770 .with_database("db")
771 .with_user("user")
772 .with_password("super_secret_password")
773 .build()
774 .unwrap();
775 let props = source.properties();
776
777 assert_eq!(
779 props.get("password"),
780 Some(&serde_json::Value::String(
781 "super_secret_password".to_string()
782 ))
783 );
784 }
785
786 #[test]
787 fn test_properties_includes_tables() {
788 let source = PostgresSourceBuilder::new("test")
789 .with_database("db")
790 .with_user("user")
791 .with_tables(vec!["users".to_string(), "orders".to_string()])
792 .build()
793 .unwrap();
794 let props = source.properties();
795
796 let tables = props.get("tables").unwrap().as_array().unwrap();
797 assert_eq!(tables.len(), 2);
798 assert_eq!(tables[0], "users");
799 assert_eq!(tables[1], "orders");
800 }
801 }
802
803 mod lifecycle {
804 use super::*;
805
806 struct TestSecretResolver;
808
809 impl drasi_plugin_sdk::resolver::ValueResolver for TestSecretResolver {
810 fn resolve_to_string(
811 &self,
812 value: &drasi_plugin_sdk::ConfigValue<String>,
813 ) -> Result<String, drasi_plugin_sdk::resolver::ResolverError> {
814 match value {
815 drasi_plugin_sdk::ConfigValue::Secret { name } => {
816 Ok(format!("resolved-secret-{name}"))
817 }
818 _ => Err(drasi_plugin_sdk::resolver::ResolverError::WrongResolverType),
819 }
820 }
821 }
822
823 fn ensure_test_secret_resolver() {
824 let _ = drasi_plugin_sdk::resolver::register_secret_resolver(std::sync::Arc::new(
825 TestSecretResolver,
826 ));
827 }
828
829 #[tokio::test]
830 async fn test_descriptor_preserves_secret_envelope() {
831 use crate::descriptor::PostgresSourceDescriptor;
832 use drasi_lib::sources::Source;
833 use drasi_plugin_sdk::descriptor::SourcePluginDescriptor;
834
835 ensure_test_secret_resolver();
836
837 let config_json = serde_json::json!({
838 "host": "db.example.com",
839 "port": 5432,
840 "database": "mydb",
841 "user": "app_user",
842 "password": {
843 "kind": "Secret",
844 "name": "db-password"
845 },
846 "tables": ["users"],
847 "slotName": "drasi_slot",
848 "publicationName": "drasi_pub"
849 });
850
851 let descriptor = PostgresSourceDescriptor;
852 let source = descriptor
853 .create_source("pg-secret-test", &config_json, true)
854 .await
855 .expect("descriptor should create source");
856
857 let props = source.properties();
858
859 let password = props.get("password").expect("password must be present");
861 assert!(
862 password.is_object(),
863 "password should be Secret envelope, got: {password}"
864 );
865 assert_eq!(
866 password.get("kind").and_then(|v| v.as_str()),
867 Some("Secret"),
868 "envelope kind must be Secret"
869 );
870 assert_eq!(
871 password.get("name").and_then(|v| v.as_str()),
872 Some("db-password"),
873 "secret name must be preserved"
874 );
875
876 let props_str = serde_json::to_string(&props).unwrap();
878 assert!(
879 !props_str.contains("resolved-secret-db-password"),
880 "resolved secret must not appear in properties"
881 );
882
883 assert!(
885 props.contains_key("slotName"),
886 "expected camelCase 'slotName', got keys: {:?}",
887 props.keys().collect::<Vec<_>>()
888 );
889 assert!(
890 props.contains_key("publicationName"),
891 "expected camelCase 'publicationName'"
892 );
893 }
894
895 #[tokio::test]
896 async fn test_initial_status_is_stopped() {
897 let source = PostgresSourceBuilder::new("test")
898 .with_database("db")
899 .with_user("user")
900 .build()
901 .unwrap();
902 assert_eq!(source.status().await, ComponentStatus::Stopped);
903 }
904
905 #[test]
906 fn test_builder_fallback_produces_camel_case() {
907 use drasi_lib::sources::Source;
908
909 let source = PostgresSourceBuilder::new("pg-fallback")
910 .with_host("myhost.example.com")
911 .with_port(5433)
912 .with_database("mydb")
913 .with_user("admin")
914 .with_password("secret123")
915 .with_ssl_mode(SslMode::Require)
916 .with_slot_name("custom_slot")
917 .with_publication_name("custom_pub")
918 .build()
919 .unwrap();
920
921 let props = source.properties();
922
923 assert!(
925 props.contains_key("slotName"),
926 "expected camelCase 'slotName', got keys: {:?}",
927 props.keys().collect::<Vec<_>>()
928 );
929 assert!(
930 props.contains_key("publicationName"),
931 "expected camelCase 'publicationName'"
932 );
933 assert!(
934 props.contains_key("sslMode"),
935 "expected camelCase 'sslMode'"
936 );
937
938 assert!(
940 !props.contains_key("slot_name"),
941 "should not have snake_case 'slot_name'"
942 );
943 assert!(
944 !props.contains_key("publication_name"),
945 "should not have snake_case 'publication_name'"
946 );
947
948 assert_eq!(
950 props.get("host").and_then(|v| v.as_str()),
951 Some("myhost.example.com")
952 );
953 assert_eq!(props.get("port").and_then(|v| v.as_u64()), Some(5433));
954 assert_eq!(props.get("database").and_then(|v| v.as_str()), Some("mydb"));
955 assert_eq!(
956 props.get("password").and_then(|v| v.as_str()),
957 Some("secret123")
958 );
959 }
960 }
961
962 mod builder {
963 use super::*;
964
965 #[test]
966 fn test_postgres_builder_defaults() {
967 let source = PostgresSourceBuilder::new("test").build().unwrap();
968 assert_eq!(source.config.host, "localhost");
969 assert_eq!(source.config.port, 5432);
970 assert_eq!(source.config.slot_name, "drasi_slot");
971 assert_eq!(source.config.publication_name, "drasi_publication");
972 }
973
974 #[test]
975 fn test_postgres_builder_custom_values() {
976 let source = PostgresSourceBuilder::new("test")
977 .with_host("db.example.com")
978 .with_port(5433)
979 .with_database("production")
980 .with_user("app_user")
981 .with_password("secret")
982 .with_tables(vec!["users".to_string(), "orders".to_string()])
983 .build()
984 .unwrap();
985
986 assert_eq!(source.config.host, "db.example.com");
987 assert_eq!(source.config.port, 5433);
988 assert_eq!(source.config.database, "production");
989 assert_eq!(source.config.user, "app_user");
990 assert_eq!(source.config.password, "secret");
991 assert_eq!(source.config.tables.len(), 2);
992 assert_eq!(source.config.tables[0], "users");
993 assert_eq!(source.config.tables[1], "orders");
994 }
995
996 #[test]
997 fn test_builder_add_table() {
998 let source = PostgresSourceBuilder::new("test")
999 .add_table("table1")
1000 .add_table("table2")
1001 .add_table("table3")
1002 .build()
1003 .unwrap();
1004
1005 assert_eq!(source.config.tables.len(), 3);
1006 assert_eq!(source.config.tables[0], "table1");
1007 assert_eq!(source.config.tables[1], "table2");
1008 assert_eq!(source.config.tables[2], "table3");
1009 }
1010
1011 #[test]
1012 fn test_builder_slot_and_publication() {
1013 let source = PostgresSourceBuilder::new("test")
1014 .with_slot_name("custom_slot")
1015 .with_publication_name("custom_pub")
1016 .build()
1017 .unwrap();
1018
1019 assert_eq!(source.config.slot_name, "custom_slot");
1020 assert_eq!(source.config.publication_name, "custom_pub");
1021 }
1022
1023 #[test]
1024 fn test_builder_id() {
1025 let source = PostgresReplicationSource::builder("my-pg-source")
1026 .with_database("db")
1027 .with_user("user")
1028 .build()
1029 .unwrap();
1030
1031 assert_eq!(source.base.id, "my-pg-source");
1032 }
1033 }
1034
1035 mod config {
1036 use super::*;
1037
1038 #[test]
1039 fn test_config_serialization() {
1040 let config = PostgresSourceConfig {
1041 host: "localhost".to_string(),
1042 port: 5432,
1043 database: "testdb".to_string(),
1044 user: "testuser".to_string(),
1045 password: String::new(),
1046 tables: Vec::new(),
1047 slot_name: "drasi_slot".to_string(),
1048 publication_name: "drasi_publication".to_string(),
1049 ssl_mode: SslMode::default(),
1050 table_keys: Vec::new(),
1051 };
1052
1053 let json = serde_json::to_string(&config).unwrap();
1054 let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
1055
1056 assert_eq!(config, deserialized);
1057 }
1058
1059 #[test]
1060 fn test_config_deserialization_with_required_fields() {
1061 let json = r#"{
1062 "database": "mydb",
1063 "user": "myuser"
1064 }"#;
1065 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1066
1067 assert_eq!(config.database, "mydb");
1068 assert_eq!(config.user, "myuser");
1069 assert_eq!(config.host, "localhost"); assert_eq!(config.port, 5432); assert_eq!(config.slot_name, "drasi_slot"); }
1073
1074 #[test]
1075 fn test_config_deserialization_full() {
1076 let json = r#"{
1077 "host": "db.prod.internal",
1078 "port": 5433,
1079 "database": "production",
1080 "user": "replication_user",
1081 "password": "secret",
1082 "tables": ["accounts", "transactions"],
1083 "slot_name": "prod_slot",
1084 "publication_name": "prod_publication"
1085 }"#;
1086 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1087
1088 assert_eq!(config.host, "db.prod.internal");
1089 assert_eq!(config.port, 5433);
1090 assert_eq!(config.database, "production");
1091 assert_eq!(config.user, "replication_user");
1092 assert_eq!(config.password, "secret");
1093 assert_eq!(config.tables, vec!["accounts", "transactions"]);
1094 assert_eq!(config.slot_name, "prod_slot");
1095 assert_eq!(config.publication_name, "prod_publication");
1096 }
1097 }
1098}
1099
1100#[cfg(feature = "dynamic-plugin")]
1104drasi_plugin_sdk::export_plugin!(
1105 plugin_id = "postgres-source",
1106 core_version = env!("CARGO_PKG_VERSION"),
1107 lib_version = env!("CARGO_PKG_VERSION"),
1108 plugin_version = env!("CARGO_PKG_VERSION"),
1109 source_descriptors = [descriptor::PostgresSourceDescriptor],
1110 reaction_descriptors = [],
1111 bootstrap_descriptors = [],
1112);