1#![allow(unexpected_cfgs)]
2pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::{anyhow, Result};
183use async_trait::async_trait;
184use drasi_lib::schema::{
185 normalize_table_label, NodeSchema, PropertySchema, PropertyType, SourceSchema,
186};
187use log::{debug, error, info};
188use postgres_native_tls::MakeTlsConnector;
189use std::collections::HashMap;
190use std::sync::Arc;
191use tokio::sync::RwLock;
192
193use drasi_lib::channels::{DispatchMode, *};
194use drasi_lib::component_graph::ComponentStatusHandle;
195use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
196use drasi_lib::Source;
197use tracing::Instrument;
198
199pub struct PostgresReplicationSource {
209 base: SourceBase,
211 config: PostgresSourceConfig,
213 cached_schema: Arc<std::sync::RwLock<Option<SourceSchema>>>,
215}
216
217fn postgres_type_to_property_type(data_type: &str) -> Option<PropertyType> {
218 match data_type {
219 "smallint" | "integer" | "bigint" => Some(PropertyType::Integer),
220 "real" | "double precision" | "numeric" | "decimal" => Some(PropertyType::Float),
221 "boolean" => Some(PropertyType::Boolean),
222 "timestamp without time zone"
223 | "timestamp with time zone"
224 | "date"
225 | "time without time zone"
226 | "time with time zone" => Some(PropertyType::Timestamp),
227 "json" | "jsonb" => Some(PropertyType::Json),
228 "character" | "character varying" | "text" | "uuid" | "bytea" => Some(PropertyType::String),
229 _ => None,
230 }
231}
232
233async fn introspect_postgres_schema(config: &PostgresSourceConfig) -> Result<Option<SourceSchema>> {
234 if config.tables.is_empty() {
235 return Ok(None);
236 }
237
238 let mut pg_config = tokio_postgres::Config::new();
239 pg_config.host(&config.host);
240 pg_config.port(config.port);
241 pg_config.dbname(&config.database);
242 pg_config.user(&config.user);
243 if !config.password.is_empty() {
244 pg_config.password(&config.password);
245 }
246
247 let client = match config.ssl_mode {
248 SslMode::Require => {
249 pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
250 let tls_connector = native_tls::TlsConnector::builder()
251 .danger_accept_invalid_hostnames(false)
252 .danger_accept_invalid_certs(false)
253 .build()
254 .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
255 let connector = MakeTlsConnector::new(tls_connector);
256
257 debug!("Schema introspection: connecting with SSL (require)");
258 let (client, connection) = pg_config.connect(connector).await?;
259 tokio::spawn(async move {
260 if let Err(e) = connection.await {
261 log::warn!("PostgreSQL schema introspection connection closed: {e}");
262 }
263 });
264 client
265 }
266 SslMode::Prefer => {
267 let tls_connector = native_tls::TlsConnector::builder()
269 .danger_accept_invalid_hostnames(false)
270 .danger_accept_invalid_certs(false)
271 .build()
272 .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
273 let connector = MakeTlsConnector::new(tls_connector);
274
275 pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer);
276 debug!("Schema introspection: connecting with SSL (prefer)");
277 let (client, connection) = pg_config.connect(connector).await?;
278 tokio::spawn(async move {
279 if let Err(e) = connection.await {
280 log::warn!("PostgreSQL schema introspection connection closed: {e}");
281 }
282 });
283 client
284 }
285 SslMode::Disable => {
286 debug!("Schema introspection: connecting without SSL");
287 let (client, connection) = pg_config.connect(tokio_postgres::NoTls).await?;
288 tokio::spawn(async move {
289 if let Err(e) = connection.await {
290 log::warn!("PostgreSQL schema introspection connection closed: {e}");
291 }
292 });
293 client
294 }
295 };
296
297 let mut nodes = Vec::new();
298
299 for table in &config.tables {
300 let (schema_name, table_name) = table
301 .split_once('.')
302 .map(|(schema, name)| (schema.to_string(), name.to_string()))
303 .unwrap_or_else(|| ("public".to_string(), table.to_string()));
304
305 let rows = client
306 .query(
307 "SELECT column_name, data_type \
308 FROM information_schema.columns \
309 WHERE table_schema = $1 AND table_name = $2 \
310 ORDER BY ordinal_position",
311 &[&schema_name, &table_name],
312 )
313 .await?;
314
315 let properties = rows
316 .into_iter()
317 .map(|row| PropertySchema {
318 name: row.get::<_, String>(0),
319 data_type: postgres_type_to_property_type(&row.get::<_, String>(1)),
320 description: None,
321 })
322 .collect();
323
324 nodes.push(NodeSchema {
325 label: normalize_table_label(&table_name),
326 properties,
327 });
328 }
329
330 Ok(Some(SourceSchema {
331 nodes,
332 relations: Vec::new(),
333 }))
334}
335
336impl PostgresReplicationSource {
337 pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
354 PostgresSourceBuilder::new(id)
355 }
356
357 pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
389 let id = id.into();
390 let params = SourceBaseParams::new(id);
391 Ok(Self {
392 base: SourceBase::new(params)?,
393 config,
394 cached_schema: Arc::new(std::sync::RwLock::new(None)),
395 })
396 }
397
398 pub fn with_dispatch(
403 id: impl Into<String>,
404 config: PostgresSourceConfig,
405 dispatch_mode: Option<DispatchMode>,
406 dispatch_buffer_capacity: Option<usize>,
407 ) -> Result<Self> {
408 let id = id.into();
409 let mut params = SourceBaseParams::new(id);
410 if let Some(mode) = dispatch_mode {
411 params = params.with_dispatch_mode(mode);
412 }
413 if let Some(capacity) = dispatch_buffer_capacity {
414 params = params.with_dispatch_buffer_capacity(capacity);
415 }
416 Ok(Self {
417 base: SourceBase::new(params)?,
418 config,
419 cached_schema: Arc::new(std::sync::RwLock::new(None)),
420 })
421 }
422}
423
424#[async_trait]
425impl Source for PostgresReplicationSource {
426 fn id(&self) -> &str {
427 &self.base.id
428 }
429
430 fn type_name(&self) -> &str {
431 "postgres"
432 }
433
434 fn properties(&self) -> HashMap<String, serde_json::Value> {
435 use crate::descriptor::PostgresSourceConfigDto;
436
437 self.base
438 .properties_or_serialize(&PostgresSourceConfigDto::from(&self.config))
439 }
440
441 fn auto_start(&self) -> bool {
442 self.base.get_auto_start()
443 }
444
445 fn describe_schema(&self) -> Option<SourceSchema> {
446 self.cached_schema
447 .read()
448 .ok()
449 .and_then(|schema| schema.clone())
450 .or_else(|| {
451 if self.config.tables.is_empty() {
452 None
453 } else {
454 Some(SourceSchema {
455 nodes: self
456 .config
457 .tables
458 .iter()
459 .map(|table| NodeSchema::new(normalize_table_label(table)))
460 .collect(),
461 relations: Vec::new(),
462 })
463 }
464 })
465 }
466
467 async fn start(&self) -> Result<()> {
468 if self.base.get_status().await == ComponentStatus::Running {
469 return Ok(());
470 }
471
472 self.base.set_status(ComponentStatus::Starting, None).await;
473 info!("Starting PostgreSQL replication source: {}", self.base.id);
474
475 match introspect_postgres_schema(&self.config).await {
476 Ok(Some(schema)) => {
477 if let Ok(mut cached) = self.cached_schema.write() {
478 *cached = Some(schema);
479 }
480 }
481 Ok(None) => {}
482 Err(e) => {
483 log::warn!(
484 "Failed to introspect PostgreSQL schema for '{}': {e}",
485 self.base.id
486 );
487 }
488 }
489
490 let config = self.config.clone();
491 let source_id = self.base.id.clone();
492 let dispatchers = self.base.dispatchers.clone();
493 let reporter = self.base.status_handle();
494
495 let instance_id = self
497 .base
498 .context()
499 .await
500 .map(|c| c.instance_id)
501 .unwrap_or_default();
502
503 let source_id_for_span = source_id.clone();
505 let span = tracing::info_span!(
506 "postgres_replication_task",
507 instance_id = %instance_id,
508 component_id = %source_id_for_span,
509 component_type = "source"
510 );
511
512 let task = tokio::spawn(
513 async move {
514 if let Err(e) =
515 run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
516 {
517 error!("Replication task failed for {source_id}: {e}");
518 reporter
519 .set_status(
520 ComponentStatus::Error,
521 Some(format!("Replication failed: {e}")),
522 )
523 .await;
524 }
525 }
526 .instrument(span),
527 );
528
529 *self.base.task_handle.write().await = Some(task);
530 self.base
531 .set_status(
532 ComponentStatus::Running,
533 Some("PostgreSQL replication started".to_string()),
534 )
535 .await;
536
537 Ok(())
538 }
539
540 async fn stop(&self) -> Result<()> {
541 if self.base.get_status().await != ComponentStatus::Running {
542 return Ok(());
543 }
544
545 info!("Stopping PostgreSQL replication source: {}", self.base.id);
546
547 self.base.set_status(ComponentStatus::Stopping, None).await;
548
549 if let Some(task) = self.base.task_handle.write().await.take() {
551 task.abort();
552 }
553
554 if let Ok(mut cached) = self.cached_schema.write() {
556 *cached = None;
557 }
558
559 self.base
560 .set_status(
561 ComponentStatus::Stopped,
562 Some("PostgreSQL replication stopped".to_string()),
563 )
564 .await;
565
566 Ok(())
567 }
568
569 async fn status(&self) -> ComponentStatus {
570 self.base.get_status().await
571 }
572
573 async fn subscribe(
574 &self,
575 settings: drasi_lib::config::SourceSubscriptionSettings,
576 ) -> Result<SubscriptionResponse> {
577 self.base
578 .subscribe_with_bootstrap(&settings, "PostgreSQL")
579 .await
580 }
581
582 fn as_any(&self) -> &dyn std::any::Any {
583 self
584 }
585
586 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
587 self.base.initialize(context).await;
588 }
589
590 async fn set_bootstrap_provider(
591 &self,
592 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
593 ) {
594 self.base.set_bootstrap_provider(provider).await;
595 }
596}
597
598async fn run_replication(
599 source_id: String,
600 config: PostgresSourceConfig,
601 dispatchers: Arc<
602 RwLock<
603 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
604 >,
605 >,
606 status_handle: ComponentStatusHandle,
607) -> Result<()> {
608 info!("Starting replication for source {source_id}");
609
610 let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
611
612 stream.run().await
613}
614
615pub struct PostgresSourceBuilder {
635 id: String,
636 host: String,
637 port: u16,
638 database: String,
639 user: String,
640 password: String,
641 tables: Vec<String>,
642 slot_name: String,
643 publication_name: String,
644 ssl_mode: SslMode,
645 table_keys: Vec<TableKeyConfig>,
646 dispatch_mode: Option<DispatchMode>,
647 dispatch_buffer_capacity: Option<usize>,
648 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
649 auto_start: bool,
650}
651
652impl PostgresSourceBuilder {
653 pub fn new(id: impl Into<String>) -> Self {
655 Self {
656 id: id.into(),
657 host: "localhost".to_string(),
658 port: 5432,
659 database: String::new(),
660 user: String::new(),
661 password: String::new(),
662 tables: Vec::new(),
663 slot_name: "drasi_slot".to_string(),
664 publication_name: "drasi_publication".to_string(),
665 ssl_mode: SslMode::default(),
666 table_keys: Vec::new(),
667 dispatch_mode: None,
668 dispatch_buffer_capacity: None,
669 bootstrap_provider: None,
670 auto_start: true,
671 }
672 }
673
674 pub fn with_host(mut self, host: impl Into<String>) -> Self {
676 self.host = host.into();
677 self
678 }
679
680 pub fn with_port(mut self, port: u16) -> Self {
682 self.port = port;
683 self
684 }
685
686 pub fn with_database(mut self, database: impl Into<String>) -> Self {
688 self.database = database.into();
689 self
690 }
691
692 pub fn with_user(mut self, user: impl Into<String>) -> Self {
694 self.user = user.into();
695 self
696 }
697
698 pub fn with_password(mut self, password: impl Into<String>) -> Self {
700 self.password = password.into();
701 self
702 }
703
704 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
706 self.tables = tables;
707 self
708 }
709
710 pub fn add_table(mut self, table: impl Into<String>) -> Self {
712 self.tables.push(table.into());
713 self
714 }
715
716 pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
718 self.slot_name = slot_name.into();
719 self
720 }
721
722 pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
724 self.publication_name = publication_name.into();
725 self
726 }
727
728 pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
730 self.ssl_mode = ssl_mode;
731 self
732 }
733
734 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
736 self.table_keys = table_keys;
737 self
738 }
739
740 pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
742 self.table_keys.push(table_key);
743 self
744 }
745
746 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
748 self.dispatch_mode = Some(mode);
749 self
750 }
751
752 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
754 self.dispatch_buffer_capacity = Some(capacity);
755 self
756 }
757
758 pub fn with_bootstrap_provider(
760 mut self,
761 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
762 ) -> Self {
763 self.bootstrap_provider = Some(Box::new(provider));
764 self
765 }
766
767 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
772 self.auto_start = auto_start;
773 self
774 }
775
776 pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
778 self.host = config.host;
779 self.port = config.port;
780 self.database = config.database;
781 self.user = config.user;
782 self.password = config.password;
783 self.tables = config.tables;
784 self.slot_name = config.slot_name;
785 self.publication_name = config.publication_name;
786 self.ssl_mode = config.ssl_mode;
787 self.table_keys = config.table_keys;
788 self
789 }
790
791 pub fn build(self) -> Result<PostgresReplicationSource> {
797 let config = PostgresSourceConfig {
798 host: self.host,
799 port: self.port,
800 database: self.database,
801 user: self.user,
802 password: self.password,
803 tables: self.tables,
804 slot_name: self.slot_name,
805 publication_name: self.publication_name,
806 ssl_mode: self.ssl_mode,
807 table_keys: self.table_keys,
808 };
809
810 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
811 if let Some(mode) = self.dispatch_mode {
812 params = params.with_dispatch_mode(mode);
813 }
814 if let Some(capacity) = self.dispatch_buffer_capacity {
815 params = params.with_dispatch_buffer_capacity(capacity);
816 }
817 if let Some(provider) = self.bootstrap_provider {
818 params = params.with_bootstrap_provider(provider);
819 }
820
821 Ok(PostgresReplicationSource {
822 base: SourceBase::new(params)?,
823 config,
824 cached_schema: Arc::new(std::sync::RwLock::new(None)),
825 })
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832
833 mod construction {
834 use super::*;
835
836 #[test]
837 fn test_builder_with_valid_config() {
838 let source = PostgresSourceBuilder::new("test-source")
839 .with_database("testdb")
840 .with_user("testuser")
841 .build();
842 assert!(source.is_ok());
843 }
844
845 #[test]
846 fn test_builder_with_custom_config() {
847 let source = PostgresSourceBuilder::new("pg-source")
848 .with_host("192.168.1.100")
849 .with_port(5433)
850 .with_database("production")
851 .with_user("admin")
852 .with_password("secret")
853 .build()
854 .unwrap();
855 assert_eq!(source.id(), "pg-source");
856 }
857
858 #[test]
859 fn test_with_dispatch_creates_source() {
860 let config = PostgresSourceConfig {
861 host: "localhost".to_string(),
862 port: 5432,
863 database: "testdb".to_string(),
864 user: "testuser".to_string(),
865 password: String::new(),
866 tables: Vec::new(),
867 slot_name: "drasi_slot".to_string(),
868 publication_name: "drasi_publication".to_string(),
869 ssl_mode: SslMode::default(),
870 table_keys: Vec::new(),
871 };
872 let source = PostgresReplicationSource::with_dispatch(
873 "dispatch-source",
874 config,
875 Some(DispatchMode::Channel),
876 Some(2000),
877 );
878 assert!(source.is_ok());
879 assert_eq!(source.unwrap().id(), "dispatch-source");
880 }
881 }
882
883 mod properties {
884 use super::*;
885
886 #[test]
887 fn test_id_returns_correct_value() {
888 let source = PostgresSourceBuilder::new("my-pg-source")
889 .with_database("db")
890 .with_user("user")
891 .build()
892 .unwrap();
893 assert_eq!(source.id(), "my-pg-source");
894 }
895
896 #[test]
897 fn test_type_name_returns_postgres() {
898 let source = PostgresSourceBuilder::new("test")
899 .with_database("db")
900 .with_user("user")
901 .build()
902 .unwrap();
903 assert_eq!(source.type_name(), "postgres");
904 }
905
906 #[test]
907 fn test_properties_contains_connection_info() {
908 let source = PostgresSourceBuilder::new("test")
909 .with_host("db.example.com")
910 .with_port(5433)
911 .with_database("mydb")
912 .with_user("app_user")
913 .with_password("secret")
914 .with_tables(vec!["users".to_string()])
915 .build()
916 .unwrap();
917 let props = source.properties();
918
919 assert_eq!(
920 props.get("host"),
921 Some(&serde_json::Value::String("db.example.com".to_string()))
922 );
923 assert_eq!(
924 props.get("port"),
925 Some(&serde_json::Value::Number(5433.into()))
926 );
927 assert_eq!(
928 props.get("database"),
929 Some(&serde_json::Value::String("mydb".to_string()))
930 );
931 assert_eq!(
932 props.get("user"),
933 Some(&serde_json::Value::String("app_user".to_string()))
934 );
935 }
936
937 #[test]
938 fn test_properties_includes_password() {
939 let source = PostgresSourceBuilder::new("test")
940 .with_database("db")
941 .with_user("user")
942 .with_password("super_secret_password")
943 .build()
944 .unwrap();
945 let props = source.properties();
946
947 assert_eq!(
949 props.get("password"),
950 Some(&serde_json::Value::String(
951 "super_secret_password".to_string()
952 ))
953 );
954 }
955
956 #[test]
957 fn test_properties_includes_tables() {
958 let source = PostgresSourceBuilder::new("test")
959 .with_database("db")
960 .with_user("user")
961 .with_tables(vec!["users".to_string(), "orders".to_string()])
962 .build()
963 .unwrap();
964 let props = source.properties();
965
966 let tables = props.get("tables").unwrap().as_array().unwrap();
967 assert_eq!(tables.len(), 2);
968 assert_eq!(tables[0], "users");
969 assert_eq!(tables[1], "orders");
970 }
971
972 #[test]
973 fn test_describe_schema_falls_back_to_configured_tables() {
974 let source = PostgresSourceBuilder::new("test")
975 .with_database("db")
976 .with_user("user")
977 .with_tables(vec!["public.users".to_string(), "orders".to_string()])
978 .build()
979 .unwrap();
980
981 let schema = source
982 .describe_schema()
983 .expect("configured postgres tables should produce fallback schema");
984
985 assert_eq!(schema.nodes.len(), 2);
986 assert!(schema.nodes.iter().any(|node| node.label == "users"));
987 assert!(schema.nodes.iter().any(|node| node.label == "orders"));
988 }
989
990 #[test]
991 fn test_postgres_type_to_property_type_integer() {
992 assert_eq!(
993 postgres_type_to_property_type("integer"),
994 Some(PropertyType::Integer)
995 );
996 assert_eq!(
997 postgres_type_to_property_type("bigint"),
998 Some(PropertyType::Integer)
999 );
1000 assert_eq!(
1001 postgres_type_to_property_type("smallint"),
1002 Some(PropertyType::Integer)
1003 );
1004 }
1005
1006 #[test]
1007 fn test_postgres_type_to_property_type_float() {
1008 assert_eq!(
1009 postgres_type_to_property_type("double precision"),
1010 Some(PropertyType::Float)
1011 );
1012 assert_eq!(
1013 postgres_type_to_property_type("real"),
1014 Some(PropertyType::Float)
1015 );
1016 assert_eq!(
1017 postgres_type_to_property_type("numeric"),
1018 Some(PropertyType::Float)
1019 );
1020 assert_eq!(
1021 postgres_type_to_property_type("decimal"),
1022 Some(PropertyType::Float)
1023 );
1024 }
1025
1026 #[test]
1027 fn test_postgres_type_to_property_type_boolean() {
1028 assert_eq!(
1029 postgres_type_to_property_type("boolean"),
1030 Some(PropertyType::Boolean)
1031 );
1032 }
1033
1034 #[test]
1035 fn test_postgres_type_to_property_type_timestamp() {
1036 assert_eq!(
1037 postgres_type_to_property_type("timestamp with time zone"),
1038 Some(PropertyType::Timestamp)
1039 );
1040 assert_eq!(
1041 postgres_type_to_property_type("timestamp without time zone"),
1042 Some(PropertyType::Timestamp)
1043 );
1044 assert_eq!(
1045 postgres_type_to_property_type("date"),
1046 Some(PropertyType::Timestamp)
1047 );
1048 }
1049
1050 #[test]
1051 fn test_postgres_type_to_property_type_json() {
1052 assert_eq!(
1053 postgres_type_to_property_type("json"),
1054 Some(PropertyType::Json)
1055 );
1056 assert_eq!(
1057 postgres_type_to_property_type("jsonb"),
1058 Some(PropertyType::Json)
1059 );
1060 }
1061
1062 #[test]
1063 fn test_postgres_type_to_property_type_string() {
1064 assert_eq!(
1065 postgres_type_to_property_type("character varying"),
1066 Some(PropertyType::String)
1067 );
1068 assert_eq!(
1069 postgres_type_to_property_type("text"),
1070 Some(PropertyType::String)
1071 );
1072 assert_eq!(
1073 postgres_type_to_property_type("uuid"),
1074 Some(PropertyType::String)
1075 );
1076 }
1077
1078 #[test]
1079 fn test_postgres_type_to_property_type_unknown_returns_none() {
1080 assert_eq!(postgres_type_to_property_type("point"), None);
1081 assert_eq!(postgres_type_to_property_type("polygon"), None);
1082 assert_eq!(postgres_type_to_property_type("cidr"), None);
1083 }
1084 }
1085
1086 mod lifecycle {
1087 use super::*;
1088
1089 struct TestSecretResolver;
1091
1092 impl drasi_plugin_sdk::resolver::ValueResolver for TestSecretResolver {
1093 fn resolve_to_string(
1094 &self,
1095 value: &drasi_plugin_sdk::ConfigValue<String>,
1096 ) -> Result<String, drasi_plugin_sdk::resolver::ResolverError> {
1097 match value {
1098 drasi_plugin_sdk::ConfigValue::Secret { name } => {
1099 Ok(format!("resolved-secret-{name}"))
1100 }
1101 _ => Err(drasi_plugin_sdk::resolver::ResolverError::WrongResolverType),
1102 }
1103 }
1104 }
1105
1106 fn ensure_test_secret_resolver() {
1107 let _ = drasi_plugin_sdk::resolver::register_secret_resolver(std::sync::Arc::new(
1108 TestSecretResolver,
1109 ));
1110 }
1111
1112 #[tokio::test]
1113 async fn test_descriptor_preserves_secret_envelope() {
1114 use crate::descriptor::PostgresSourceDescriptor;
1115 use drasi_lib::sources::Source;
1116 use drasi_plugin_sdk::descriptor::SourcePluginDescriptor;
1117
1118 ensure_test_secret_resolver();
1119
1120 let config_json = serde_json::json!({
1121 "host": "db.example.com",
1122 "port": 5432,
1123 "database": "mydb",
1124 "user": "app_user",
1125 "password": {
1126 "kind": "Secret",
1127 "name": "db-password"
1128 },
1129 "tables": ["users"],
1130 "slotName": "drasi_slot",
1131 "publicationName": "drasi_pub"
1132 });
1133
1134 let descriptor = PostgresSourceDescriptor;
1135 let source = descriptor
1136 .create_source("pg-secret-test", &config_json, true)
1137 .await
1138 .expect("descriptor should create source");
1139
1140 let props = source.properties();
1141
1142 let password = props.get("password").expect("password must be present");
1144 assert!(
1145 password.is_object(),
1146 "password should be Secret envelope, got: {password}"
1147 );
1148 assert_eq!(
1149 password.get("kind").and_then(|v| v.as_str()),
1150 Some("Secret"),
1151 "envelope kind must be Secret"
1152 );
1153 assert_eq!(
1154 password.get("name").and_then(|v| v.as_str()),
1155 Some("db-password"),
1156 "secret name must be preserved"
1157 );
1158
1159 let props_str = serde_json::to_string(&props).unwrap();
1161 assert!(
1162 !props_str.contains("resolved-secret-db-password"),
1163 "resolved secret must not appear in properties"
1164 );
1165
1166 assert!(
1168 props.contains_key("slotName"),
1169 "expected camelCase 'slotName', got keys: {:?}",
1170 props.keys().collect::<Vec<_>>()
1171 );
1172 assert!(
1173 props.contains_key("publicationName"),
1174 "expected camelCase 'publicationName'"
1175 );
1176 }
1177
1178 #[tokio::test]
1179 async fn test_initial_status_is_stopped() {
1180 let source = PostgresSourceBuilder::new("test")
1181 .with_database("db")
1182 .with_user("user")
1183 .build()
1184 .unwrap();
1185 assert_eq!(source.status().await, ComponentStatus::Stopped);
1186 }
1187
1188 #[test]
1189 fn test_builder_fallback_produces_camel_case() {
1190 use drasi_lib::sources::Source;
1191
1192 let source = PostgresSourceBuilder::new("pg-fallback")
1193 .with_host("myhost.example.com")
1194 .with_port(5433)
1195 .with_database("mydb")
1196 .with_user("admin")
1197 .with_password("secret123")
1198 .with_ssl_mode(SslMode::Require)
1199 .with_slot_name("custom_slot")
1200 .with_publication_name("custom_pub")
1201 .build()
1202 .unwrap();
1203
1204 let props = source.properties();
1205
1206 assert!(
1208 props.contains_key("slotName"),
1209 "expected camelCase 'slotName', got keys: {:?}",
1210 props.keys().collect::<Vec<_>>()
1211 );
1212 assert!(
1213 props.contains_key("publicationName"),
1214 "expected camelCase 'publicationName'"
1215 );
1216 assert!(
1217 props.contains_key("sslMode"),
1218 "expected camelCase 'sslMode'"
1219 );
1220
1221 assert!(
1223 !props.contains_key("slot_name"),
1224 "should not have snake_case 'slot_name'"
1225 );
1226 assert!(
1227 !props.contains_key("publication_name"),
1228 "should not have snake_case 'publication_name'"
1229 );
1230
1231 assert_eq!(
1233 props.get("host").and_then(|v| v.as_str()),
1234 Some("myhost.example.com")
1235 );
1236 assert_eq!(props.get("port").and_then(|v| v.as_u64()), Some(5433));
1237 assert_eq!(props.get("database").and_then(|v| v.as_str()), Some("mydb"));
1238 assert_eq!(
1239 props.get("password").and_then(|v| v.as_str()),
1240 Some("secret123")
1241 );
1242 }
1243 }
1244
1245 mod builder {
1246 use super::*;
1247
1248 #[test]
1249 fn test_postgres_builder_defaults() {
1250 let source = PostgresSourceBuilder::new("test").build().unwrap();
1251 assert_eq!(source.config.host, "localhost");
1252 assert_eq!(source.config.port, 5432);
1253 assert_eq!(source.config.slot_name, "drasi_slot");
1254 assert_eq!(source.config.publication_name, "drasi_publication");
1255 }
1256
1257 #[test]
1258 fn test_postgres_builder_custom_values() {
1259 let source = PostgresSourceBuilder::new("test")
1260 .with_host("db.example.com")
1261 .with_port(5433)
1262 .with_database("production")
1263 .with_user("app_user")
1264 .with_password("secret")
1265 .with_tables(vec!["users".to_string(), "orders".to_string()])
1266 .build()
1267 .unwrap();
1268
1269 assert_eq!(source.config.host, "db.example.com");
1270 assert_eq!(source.config.port, 5433);
1271 assert_eq!(source.config.database, "production");
1272 assert_eq!(source.config.user, "app_user");
1273 assert_eq!(source.config.password, "secret");
1274 assert_eq!(source.config.tables.len(), 2);
1275 assert_eq!(source.config.tables[0], "users");
1276 assert_eq!(source.config.tables[1], "orders");
1277 }
1278
1279 #[test]
1280 fn test_builder_add_table() {
1281 let source = PostgresSourceBuilder::new("test")
1282 .add_table("table1")
1283 .add_table("table2")
1284 .add_table("table3")
1285 .build()
1286 .unwrap();
1287
1288 assert_eq!(source.config.tables.len(), 3);
1289 assert_eq!(source.config.tables[0], "table1");
1290 assert_eq!(source.config.tables[1], "table2");
1291 assert_eq!(source.config.tables[2], "table3");
1292 }
1293
1294 #[test]
1295 fn test_builder_slot_and_publication() {
1296 let source = PostgresSourceBuilder::new("test")
1297 .with_slot_name("custom_slot")
1298 .with_publication_name("custom_pub")
1299 .build()
1300 .unwrap();
1301
1302 assert_eq!(source.config.slot_name, "custom_slot");
1303 assert_eq!(source.config.publication_name, "custom_pub");
1304 }
1305
1306 #[test]
1307 fn test_builder_id() {
1308 let source = PostgresReplicationSource::builder("my-pg-source")
1309 .with_database("db")
1310 .with_user("user")
1311 .build()
1312 .unwrap();
1313
1314 assert_eq!(source.base.id, "my-pg-source");
1315 }
1316 }
1317
1318 mod config {
1319 use super::*;
1320
1321 #[test]
1322 fn test_config_serialization() {
1323 let config = PostgresSourceConfig {
1324 host: "localhost".to_string(),
1325 port: 5432,
1326 database: "testdb".to_string(),
1327 user: "testuser".to_string(),
1328 password: String::new(),
1329 tables: Vec::new(),
1330 slot_name: "drasi_slot".to_string(),
1331 publication_name: "drasi_publication".to_string(),
1332 ssl_mode: SslMode::default(),
1333 table_keys: Vec::new(),
1334 };
1335
1336 let json = serde_json::to_string(&config).unwrap();
1337 let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
1338
1339 assert_eq!(config, deserialized);
1340 }
1341
1342 #[test]
1343 fn test_config_deserialization_with_required_fields() {
1344 let json = r#"{
1345 "database": "mydb",
1346 "user": "myuser"
1347 }"#;
1348 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1349
1350 assert_eq!(config.database, "mydb");
1351 assert_eq!(config.user, "myuser");
1352 assert_eq!(config.host, "localhost"); assert_eq!(config.port, 5432); assert_eq!(config.slot_name, "drasi_slot"); }
1356
1357 #[test]
1358 fn test_config_deserialization_full() {
1359 let json = r#"{
1360 "host": "db.prod.internal",
1361 "port": 5433,
1362 "database": "production",
1363 "user": "replication_user",
1364 "password": "secret",
1365 "tables": ["accounts", "transactions"],
1366 "slot_name": "prod_slot",
1367 "publication_name": "prod_publication"
1368 }"#;
1369 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1370
1371 assert_eq!(config.host, "db.prod.internal");
1372 assert_eq!(config.port, 5433);
1373 assert_eq!(config.database, "production");
1374 assert_eq!(config.user, "replication_user");
1375 assert_eq!(config.password, "secret");
1376 assert_eq!(config.tables, vec!["accounts", "transactions"]);
1377 assert_eq!(config.slot_name, "prod_slot");
1378 assert_eq!(config.publication_name, "prod_publication");
1379 }
1380 }
1381}
1382
1383#[cfg(feature = "dynamic-plugin")]
1387drasi_plugin_sdk::export_plugin!(
1388 plugin_id = "postgres-source",
1389 core_version = env!("CARGO_PKG_VERSION"),
1390 lib_version = env!("CARGO_PKG_VERSION"),
1391 plugin_version = env!("CARGO_PKG_VERSION"),
1392 source_descriptors = [descriptor::PostgresSourceDescriptor],
1393 reaction_descriptors = [],
1394 bootstrap_descriptors = [],
1395);