1#![allow(unexpected_cfgs)]
16
17pub mod config;
173pub mod connection;
174pub mod decoder;
175pub mod descriptor;
176pub mod protocol;
177pub mod scram;
178pub mod stream;
179pub mod types;
180
181pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
182
183use anyhow::{anyhow, Result};
184use async_trait::async_trait;
185use drasi_lib::schema::{
186 normalize_table_label, NodeSchema, PropertySchema, PropertyType, SourceSchema,
187};
188use log::{debug, error, info};
189use postgres_native_tls::MakeTlsConnector;
190use std::collections::HashMap;
191use std::sync::Arc;
192use tokio::sync::RwLock;
193
194use drasi_lib::channels::{DispatchMode, *};
195use drasi_lib::component_graph::ComponentStatusHandle;
196use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
197use drasi_lib::Source;
198use tracing::Instrument;
199
200pub struct PostgresReplicationSource {
210 base: SourceBase,
212 config: PostgresSourceConfig,
214 cached_schema: Arc<std::sync::RwLock<Option<SourceSchema>>>,
216}
217
218fn postgres_type_to_property_type(data_type: &str) -> Option<PropertyType> {
219 match data_type {
220 "smallint" | "integer" | "bigint" => Some(PropertyType::Integer),
221 "real" | "double precision" | "numeric" | "decimal" => Some(PropertyType::Float),
222 "boolean" => Some(PropertyType::Boolean),
223 "timestamp without time zone"
224 | "timestamp with time zone"
225 | "date"
226 | "time without time zone"
227 | "time with time zone" => Some(PropertyType::Timestamp),
228 "json" | "jsonb" => Some(PropertyType::Json),
229 "character" | "character varying" | "text" | "uuid" | "bytea" => Some(PropertyType::String),
230 _ => None,
231 }
232}
233
234async fn introspect_postgres_schema(config: &PostgresSourceConfig) -> Result<Option<SourceSchema>> {
235 if config.tables.is_empty() {
236 return Ok(None);
237 }
238
239 let mut pg_config = tokio_postgres::Config::new();
240 pg_config.host(&config.host);
241 pg_config.port(config.port);
242 pg_config.dbname(&config.database);
243 pg_config.user(&config.user);
244 if !config.password.is_empty() {
245 pg_config.password(&config.password);
246 }
247
248 let client = match config.ssl_mode {
249 SslMode::Require => {
250 pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
251 let tls_connector = native_tls::TlsConnector::builder()
252 .danger_accept_invalid_hostnames(false)
253 .danger_accept_invalid_certs(false)
254 .build()
255 .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
256 let connector = MakeTlsConnector::new(tls_connector);
257
258 debug!("Schema introspection: connecting with SSL (require)");
259 let (client, connection) = pg_config.connect(connector).await?;
260 tokio::spawn(async move {
261 if let Err(e) = connection.await {
262 log::warn!("PostgreSQL schema introspection connection closed: {e}");
263 }
264 });
265 client
266 }
267 SslMode::Prefer => {
268 let tls_connector = native_tls::TlsConnector::builder()
270 .danger_accept_invalid_hostnames(false)
271 .danger_accept_invalid_certs(false)
272 .build()
273 .map_err(|e| anyhow!("Failed to create TLS connector: {e}"))?;
274 let connector = MakeTlsConnector::new(tls_connector);
275
276 pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer);
277 debug!("Schema introspection: connecting with SSL (prefer)");
278 let (client, connection) = pg_config.connect(connector).await?;
279 tokio::spawn(async move {
280 if let Err(e) = connection.await {
281 log::warn!("PostgreSQL schema introspection connection closed: {e}");
282 }
283 });
284 client
285 }
286 SslMode::Disable => {
287 debug!("Schema introspection: connecting without SSL");
288 let (client, connection) = pg_config.connect(tokio_postgres::NoTls).await?;
289 tokio::spawn(async move {
290 if let Err(e) = connection.await {
291 log::warn!("PostgreSQL schema introspection connection closed: {e}");
292 }
293 });
294 client
295 }
296 };
297
298 let mut nodes = Vec::new();
299
300 for table in &config.tables {
301 let (schema_name, table_name) = table
302 .split_once('.')
303 .map(|(schema, name)| (schema.to_string(), name.to_string()))
304 .unwrap_or_else(|| ("public".to_string(), table.to_string()));
305
306 let rows = client
307 .query(
308 "SELECT column_name, data_type \
309 FROM information_schema.columns \
310 WHERE table_schema = $1 AND table_name = $2 \
311 ORDER BY ordinal_position",
312 &[&schema_name, &table_name],
313 )
314 .await?;
315
316 let properties = rows
317 .into_iter()
318 .map(|row| PropertySchema {
319 name: row.get::<_, String>(0),
320 data_type: postgres_type_to_property_type(&row.get::<_, String>(1)),
321 description: None,
322 })
323 .collect();
324
325 nodes.push(NodeSchema {
326 label: normalize_table_label(&table_name),
327 properties,
328 });
329 }
330
331 Ok(Some(SourceSchema {
332 nodes,
333 relations: Vec::new(),
334 }))
335}
336
337impl PostgresReplicationSource {
338 pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
355 PostgresSourceBuilder::new(id)
356 }
357
358 pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
390 let id = id.into();
391 let params = SourceBaseParams::new(id);
392 Ok(Self {
393 base: SourceBase::new(params)?,
394 config,
395 cached_schema: Arc::new(std::sync::RwLock::new(None)),
396 })
397 }
398
399 pub fn with_dispatch(
404 id: impl Into<String>,
405 config: PostgresSourceConfig,
406 dispatch_mode: Option<DispatchMode>,
407 dispatch_buffer_capacity: Option<usize>,
408 ) -> Result<Self> {
409 let id = id.into();
410 let mut params = SourceBaseParams::new(id);
411 if let Some(mode) = dispatch_mode {
412 params = params.with_dispatch_mode(mode);
413 }
414 if let Some(capacity) = dispatch_buffer_capacity {
415 params = params.with_dispatch_buffer_capacity(capacity);
416 }
417 Ok(Self {
418 base: SourceBase::new(params)?,
419 config,
420 cached_schema: Arc::new(std::sync::RwLock::new(None)),
421 })
422 }
423}
424
425#[async_trait]
426impl Source for PostgresReplicationSource {
427 fn id(&self) -> &str {
428 &self.base.id
429 }
430
431 fn type_name(&self) -> &str {
432 "postgres"
433 }
434
435 fn properties(&self) -> HashMap<String, serde_json::Value> {
436 use crate::descriptor::PostgresSourceConfigDto;
437
438 self.base
439 .properties_or_serialize(&PostgresSourceConfigDto::from(&self.config))
440 }
441
442 fn auto_start(&self) -> bool {
443 self.base.get_auto_start()
444 }
445
446 fn describe_schema(&self) -> Option<SourceSchema> {
447 self.cached_schema
448 .read()
449 .ok()
450 .and_then(|schema| schema.clone())
451 .or_else(|| {
452 if self.config.tables.is_empty() {
453 None
454 } else {
455 Some(SourceSchema {
456 nodes: self
457 .config
458 .tables
459 .iter()
460 .map(|table| NodeSchema::new(normalize_table_label(table)))
461 .collect(),
462 relations: Vec::new(),
463 })
464 }
465 })
466 }
467
468 async fn start(&self) -> Result<()> {
469 if self.base.get_status().await == ComponentStatus::Running {
470 return Ok(());
471 }
472
473 self.base.set_status(ComponentStatus::Starting, None).await;
474 info!("Starting PostgreSQL replication source: {}", self.base.id);
475
476 match introspect_postgres_schema(&self.config).await {
477 Ok(Some(schema)) => {
478 if let Ok(mut cached) = self.cached_schema.write() {
479 *cached = Some(schema);
480 }
481 }
482 Ok(None) => {}
483 Err(e) => {
484 log::warn!(
485 "Failed to introspect PostgreSQL schema for '{}': {e}",
486 self.base.id
487 );
488 }
489 }
490
491 let config = self.config.clone();
492 let source_id = self.base.id.clone();
493 let dispatchers = self.base.dispatchers.clone();
494 let reporter = self.base.status_handle();
495
496 let instance_id = self
498 .base
499 .context()
500 .await
501 .map(|c| c.instance_id)
502 .unwrap_or_default();
503
504 let source_id_for_span = source_id.clone();
506 let span = tracing::info_span!(
507 "postgres_replication_task",
508 instance_id = %instance_id,
509 component_id = %source_id_for_span,
510 component_type = "source"
511 );
512
513 let task = tokio::spawn(
514 async move {
515 if let Err(e) =
516 run_replication(source_id.clone(), config, dispatchers, reporter.clone()).await
517 {
518 error!("Replication task failed for {source_id}: {e}");
519 reporter
520 .set_status(
521 ComponentStatus::Error,
522 Some(format!("Replication failed: {e}")),
523 )
524 .await;
525 }
526 }
527 .instrument(span),
528 );
529
530 *self.base.task_handle.write().await = Some(task);
531 self.base
532 .set_status(
533 ComponentStatus::Running,
534 Some("PostgreSQL replication started".to_string()),
535 )
536 .await;
537
538 Ok(())
539 }
540
541 async fn stop(&self) -> Result<()> {
542 if self.base.get_status().await != ComponentStatus::Running {
543 return Ok(());
544 }
545
546 info!("Stopping PostgreSQL replication source: {}", self.base.id);
547
548 self.base.set_status(ComponentStatus::Stopping, None).await;
549
550 if let Some(task) = self.base.task_handle.write().await.take() {
552 task.abort();
553 }
554
555 if let Ok(mut cached) = self.cached_schema.write() {
557 *cached = None;
558 }
559
560 self.base
561 .set_status(
562 ComponentStatus::Stopped,
563 Some("PostgreSQL replication stopped".to_string()),
564 )
565 .await;
566
567 Ok(())
568 }
569
570 async fn status(&self) -> ComponentStatus {
571 self.base.get_status().await
572 }
573
574 async fn subscribe(
575 &self,
576 settings: drasi_lib::config::SourceSubscriptionSettings,
577 ) -> Result<SubscriptionResponse> {
578 self.base
579 .subscribe_with_bootstrap(&settings, "PostgreSQL")
580 .await
581 }
582
583 fn as_any(&self) -> &dyn std::any::Any {
584 self
585 }
586
587 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
588 self.base.initialize(context).await;
589 }
590
591 async fn set_bootstrap_provider(
592 &self,
593 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
594 ) {
595 self.base.set_bootstrap_provider(provider).await;
596 }
597}
598
599async fn run_replication(
600 source_id: String,
601 config: PostgresSourceConfig,
602 dispatchers: Arc<
603 RwLock<
604 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
605 >,
606 >,
607 status_handle: ComponentStatusHandle,
608) -> Result<()> {
609 info!("Starting replication for source {source_id}");
610
611 let mut stream = stream::ReplicationStream::new(config, source_id, dispatchers, status_handle);
612
613 stream.run().await
614}
615
616pub struct PostgresSourceBuilder {
636 id: String,
637 host: String,
638 port: u16,
639 database: String,
640 user: String,
641 password: String,
642 tables: Vec<String>,
643 slot_name: String,
644 publication_name: String,
645 ssl_mode: SslMode,
646 table_keys: Vec<TableKeyConfig>,
647 dispatch_mode: Option<DispatchMode>,
648 dispatch_buffer_capacity: Option<usize>,
649 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
650 auto_start: bool,
651}
652
653impl PostgresSourceBuilder {
654 pub fn new(id: impl Into<String>) -> Self {
656 Self {
657 id: id.into(),
658 host: "localhost".to_string(),
659 port: 5432,
660 database: String::new(),
661 user: String::new(),
662 password: String::new(),
663 tables: Vec::new(),
664 slot_name: "drasi_slot".to_string(),
665 publication_name: "drasi_publication".to_string(),
666 ssl_mode: SslMode::default(),
667 table_keys: Vec::new(),
668 dispatch_mode: None,
669 dispatch_buffer_capacity: None,
670 bootstrap_provider: None,
671 auto_start: true,
672 }
673 }
674
675 pub fn with_host(mut self, host: impl Into<String>) -> Self {
677 self.host = host.into();
678 self
679 }
680
681 pub fn with_port(mut self, port: u16) -> Self {
683 self.port = port;
684 self
685 }
686
687 pub fn with_database(mut self, database: impl Into<String>) -> Self {
689 self.database = database.into();
690 self
691 }
692
693 pub fn with_user(mut self, user: impl Into<String>) -> Self {
695 self.user = user.into();
696 self
697 }
698
699 pub fn with_password(mut self, password: impl Into<String>) -> Self {
701 self.password = password.into();
702 self
703 }
704
705 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
707 self.tables = tables;
708 self
709 }
710
711 pub fn add_table(mut self, table: impl Into<String>) -> Self {
713 self.tables.push(table.into());
714 self
715 }
716
717 pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
719 self.slot_name = slot_name.into();
720 self
721 }
722
723 pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
725 self.publication_name = publication_name.into();
726 self
727 }
728
729 pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
731 self.ssl_mode = ssl_mode;
732 self
733 }
734
735 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
737 self.table_keys = table_keys;
738 self
739 }
740
741 pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
743 self.table_keys.push(table_key);
744 self
745 }
746
747 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
749 self.dispatch_mode = Some(mode);
750 self
751 }
752
753 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
755 self.dispatch_buffer_capacity = Some(capacity);
756 self
757 }
758
759 pub fn with_bootstrap_provider(
761 mut self,
762 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
763 ) -> Self {
764 self.bootstrap_provider = Some(Box::new(provider));
765 self
766 }
767
768 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
773 self.auto_start = auto_start;
774 self
775 }
776
777 pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
779 self.host = config.host;
780 self.port = config.port;
781 self.database = config.database;
782 self.user = config.user;
783 self.password = config.password;
784 self.tables = config.tables;
785 self.slot_name = config.slot_name;
786 self.publication_name = config.publication_name;
787 self.ssl_mode = config.ssl_mode;
788 self.table_keys = config.table_keys;
789 self
790 }
791
792 pub fn build(self) -> Result<PostgresReplicationSource> {
798 let config = PostgresSourceConfig {
799 host: self.host,
800 port: self.port,
801 database: self.database,
802 user: self.user,
803 password: self.password,
804 tables: self.tables,
805 slot_name: self.slot_name,
806 publication_name: self.publication_name,
807 ssl_mode: self.ssl_mode,
808 table_keys: self.table_keys,
809 };
810
811 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
812 if let Some(mode) = self.dispatch_mode {
813 params = params.with_dispatch_mode(mode);
814 }
815 if let Some(capacity) = self.dispatch_buffer_capacity {
816 params = params.with_dispatch_buffer_capacity(capacity);
817 }
818 if let Some(provider) = self.bootstrap_provider {
819 params = params.with_bootstrap_provider(provider);
820 }
821
822 Ok(PostgresReplicationSource {
823 base: SourceBase::new(params)?,
824 config,
825 cached_schema: Arc::new(std::sync::RwLock::new(None)),
826 })
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use super::*;
833
834 mod construction {
835 use super::*;
836
837 #[test]
838 fn test_builder_with_valid_config() {
839 let source = PostgresSourceBuilder::new("test-source")
840 .with_database("testdb")
841 .with_user("testuser")
842 .build();
843 assert!(source.is_ok());
844 }
845
846 #[test]
847 fn test_builder_with_custom_config() {
848 let source = PostgresSourceBuilder::new("pg-source")
849 .with_host("192.168.1.100")
850 .with_port(5433)
851 .with_database("production")
852 .with_user("admin")
853 .with_password("secret")
854 .build()
855 .unwrap();
856 assert_eq!(source.id(), "pg-source");
857 }
858
859 #[test]
860 fn test_with_dispatch_creates_source() {
861 let config = PostgresSourceConfig {
862 host: "localhost".to_string(),
863 port: 5432,
864 database: "testdb".to_string(),
865 user: "testuser".to_string(),
866 password: String::new(),
867 tables: Vec::new(),
868 slot_name: "drasi_slot".to_string(),
869 publication_name: "drasi_publication".to_string(),
870 ssl_mode: SslMode::default(),
871 table_keys: Vec::new(),
872 };
873 let source = PostgresReplicationSource::with_dispatch(
874 "dispatch-source",
875 config,
876 Some(DispatchMode::Channel),
877 Some(2000),
878 );
879 assert!(source.is_ok());
880 assert_eq!(source.unwrap().id(), "dispatch-source");
881 }
882 }
883
884 mod properties {
885 use super::*;
886
887 #[test]
888 fn test_id_returns_correct_value() {
889 let source = PostgresSourceBuilder::new("my-pg-source")
890 .with_database("db")
891 .with_user("user")
892 .build()
893 .unwrap();
894 assert_eq!(source.id(), "my-pg-source");
895 }
896
897 #[test]
898 fn test_type_name_returns_postgres() {
899 let source = PostgresSourceBuilder::new("test")
900 .with_database("db")
901 .with_user("user")
902 .build()
903 .unwrap();
904 assert_eq!(source.type_name(), "postgres");
905 }
906
907 #[test]
908 fn test_properties_contains_connection_info() {
909 let source = PostgresSourceBuilder::new("test")
910 .with_host("db.example.com")
911 .with_port(5433)
912 .with_database("mydb")
913 .with_user("app_user")
914 .with_password("secret")
915 .with_tables(vec!["users".to_string()])
916 .build()
917 .unwrap();
918 let props = source.properties();
919
920 assert_eq!(
921 props.get("host"),
922 Some(&serde_json::Value::String("db.example.com".to_string()))
923 );
924 assert_eq!(
925 props.get("port"),
926 Some(&serde_json::Value::Number(5433.into()))
927 );
928 assert_eq!(
929 props.get("database"),
930 Some(&serde_json::Value::String("mydb".to_string()))
931 );
932 assert_eq!(
933 props.get("user"),
934 Some(&serde_json::Value::String("app_user".to_string()))
935 );
936 }
937
938 #[test]
939 fn test_properties_includes_password() {
940 let source = PostgresSourceBuilder::new("test")
941 .with_database("db")
942 .with_user("user")
943 .with_password("super_secret_password")
944 .build()
945 .unwrap();
946 let props = source.properties();
947
948 assert_eq!(
950 props.get("password"),
951 Some(&serde_json::Value::String(
952 "super_secret_password".to_string()
953 ))
954 );
955 }
956
957 #[test]
958 fn test_properties_includes_tables() {
959 let source = PostgresSourceBuilder::new("test")
960 .with_database("db")
961 .with_user("user")
962 .with_tables(vec!["users".to_string(), "orders".to_string()])
963 .build()
964 .unwrap();
965 let props = source.properties();
966
967 let tables = props.get("tables").unwrap().as_array().unwrap();
968 assert_eq!(tables.len(), 2);
969 assert_eq!(tables[0], "users");
970 assert_eq!(tables[1], "orders");
971 }
972
973 #[test]
974 fn test_describe_schema_falls_back_to_configured_tables() {
975 let source = PostgresSourceBuilder::new("test")
976 .with_database("db")
977 .with_user("user")
978 .with_tables(vec!["public.users".to_string(), "orders".to_string()])
979 .build()
980 .unwrap();
981
982 let schema = source
983 .describe_schema()
984 .expect("configured postgres tables should produce fallback schema");
985
986 assert_eq!(schema.nodes.len(), 2);
987 assert!(schema.nodes.iter().any(|node| node.label == "users"));
988 assert!(schema.nodes.iter().any(|node| node.label == "orders"));
989 }
990
991 #[test]
992 fn test_postgres_type_to_property_type_integer() {
993 assert_eq!(
994 postgres_type_to_property_type("integer"),
995 Some(PropertyType::Integer)
996 );
997 assert_eq!(
998 postgres_type_to_property_type("bigint"),
999 Some(PropertyType::Integer)
1000 );
1001 assert_eq!(
1002 postgres_type_to_property_type("smallint"),
1003 Some(PropertyType::Integer)
1004 );
1005 }
1006
1007 #[test]
1008 fn test_postgres_type_to_property_type_float() {
1009 assert_eq!(
1010 postgres_type_to_property_type("double precision"),
1011 Some(PropertyType::Float)
1012 );
1013 assert_eq!(
1014 postgres_type_to_property_type("real"),
1015 Some(PropertyType::Float)
1016 );
1017 assert_eq!(
1018 postgres_type_to_property_type("numeric"),
1019 Some(PropertyType::Float)
1020 );
1021 assert_eq!(
1022 postgres_type_to_property_type("decimal"),
1023 Some(PropertyType::Float)
1024 );
1025 }
1026
1027 #[test]
1028 fn test_postgres_type_to_property_type_boolean() {
1029 assert_eq!(
1030 postgres_type_to_property_type("boolean"),
1031 Some(PropertyType::Boolean)
1032 );
1033 }
1034
1035 #[test]
1036 fn test_postgres_type_to_property_type_timestamp() {
1037 assert_eq!(
1038 postgres_type_to_property_type("timestamp with time zone"),
1039 Some(PropertyType::Timestamp)
1040 );
1041 assert_eq!(
1042 postgres_type_to_property_type("timestamp without time zone"),
1043 Some(PropertyType::Timestamp)
1044 );
1045 assert_eq!(
1046 postgres_type_to_property_type("date"),
1047 Some(PropertyType::Timestamp)
1048 );
1049 }
1050
1051 #[test]
1052 fn test_postgres_type_to_property_type_json() {
1053 assert_eq!(
1054 postgres_type_to_property_type("json"),
1055 Some(PropertyType::Json)
1056 );
1057 assert_eq!(
1058 postgres_type_to_property_type("jsonb"),
1059 Some(PropertyType::Json)
1060 );
1061 }
1062
1063 #[test]
1064 fn test_postgres_type_to_property_type_string() {
1065 assert_eq!(
1066 postgres_type_to_property_type("character varying"),
1067 Some(PropertyType::String)
1068 );
1069 assert_eq!(
1070 postgres_type_to_property_type("text"),
1071 Some(PropertyType::String)
1072 );
1073 assert_eq!(
1074 postgres_type_to_property_type("uuid"),
1075 Some(PropertyType::String)
1076 );
1077 }
1078
1079 #[test]
1080 fn test_postgres_type_to_property_type_unknown_returns_none() {
1081 assert_eq!(postgres_type_to_property_type("point"), None);
1082 assert_eq!(postgres_type_to_property_type("polygon"), None);
1083 assert_eq!(postgres_type_to_property_type("cidr"), None);
1084 }
1085 }
1086
1087 mod lifecycle {
1088 use super::*;
1089
1090 struct TestSecretResolver;
1092
1093 #[async_trait::async_trait]
1094 impl drasi_plugin_sdk::resolver::ValueResolver for TestSecretResolver {
1095 async fn resolve_to_string(
1096 &self,
1097 value: &drasi_plugin_sdk::ConfigValue<String>,
1098 ) -> Result<String, drasi_plugin_sdk::resolver::ResolverError> {
1099 match value {
1100 drasi_plugin_sdk::ConfigValue::Secret { name } => {
1101 Ok(format!("resolved-secret-{name}"))
1102 }
1103 _ => Err(drasi_plugin_sdk::resolver::ResolverError::WrongResolverType),
1104 }
1105 }
1106 }
1107
1108 fn ensure_test_secret_resolver() {
1109 drasi_plugin_sdk::resolver::register_secret_resolver(std::sync::Arc::new(
1110 TestSecretResolver,
1111 ));
1112 }
1113
1114 #[tokio::test]
1115 async fn test_descriptor_preserves_secret_envelope() {
1116 use crate::descriptor::PostgresSourceDescriptor;
1117 use drasi_lib::sources::Source;
1118 use drasi_plugin_sdk::descriptor::SourcePluginDescriptor;
1119
1120 ensure_test_secret_resolver();
1121
1122 let config_json = serde_json::json!({
1123 "host": "db.example.com",
1124 "port": 5432,
1125 "database": "mydb",
1126 "user": "app_user",
1127 "password": {
1128 "kind": "Secret",
1129 "name": "db-password"
1130 },
1131 "tables": ["users"],
1132 "slotName": "drasi_slot",
1133 "publicationName": "drasi_pub"
1134 });
1135
1136 let descriptor = PostgresSourceDescriptor;
1137 let source = descriptor
1138 .create_source("pg-secret-test", &config_json, true)
1139 .await
1140 .expect("descriptor should create source");
1141
1142 let props = source.properties();
1143
1144 let password = props.get("password").expect("password must be present");
1146 assert!(
1147 password.is_object(),
1148 "password should be Secret envelope, got: {password}"
1149 );
1150 assert_eq!(
1151 password.get("kind").and_then(|v| v.as_str()),
1152 Some("Secret"),
1153 "envelope kind must be Secret"
1154 );
1155 assert_eq!(
1156 password.get("name").and_then(|v| v.as_str()),
1157 Some("db-password"),
1158 "secret name must be preserved"
1159 );
1160
1161 let props_str = serde_json::to_string(&props).unwrap();
1163 assert!(
1164 !props_str.contains("resolved-secret-db-password"),
1165 "resolved secret must not appear in properties"
1166 );
1167
1168 assert!(
1170 props.contains_key("slotName"),
1171 "expected camelCase 'slotName', got keys: {:?}",
1172 props.keys().collect::<Vec<_>>()
1173 );
1174 assert!(
1175 props.contains_key("publicationName"),
1176 "expected camelCase 'publicationName'"
1177 );
1178 }
1179
1180 #[tokio::test]
1181 async fn test_initial_status_is_stopped() {
1182 let source = PostgresSourceBuilder::new("test")
1183 .with_database("db")
1184 .with_user("user")
1185 .build()
1186 .unwrap();
1187 assert_eq!(source.status().await, ComponentStatus::Stopped);
1188 }
1189
1190 #[test]
1191 fn test_builder_fallback_produces_camel_case() {
1192 use drasi_lib::sources::Source;
1193
1194 let source = PostgresSourceBuilder::new("pg-fallback")
1195 .with_host("myhost.example.com")
1196 .with_port(5433)
1197 .with_database("mydb")
1198 .with_user("admin")
1199 .with_password("secret123")
1200 .with_ssl_mode(SslMode::Require)
1201 .with_slot_name("custom_slot")
1202 .with_publication_name("custom_pub")
1203 .build()
1204 .unwrap();
1205
1206 let props = source.properties();
1207
1208 assert!(
1210 props.contains_key("slotName"),
1211 "expected camelCase 'slotName', got keys: {:?}",
1212 props.keys().collect::<Vec<_>>()
1213 );
1214 assert!(
1215 props.contains_key("publicationName"),
1216 "expected camelCase 'publicationName'"
1217 );
1218 assert!(
1219 props.contains_key("sslMode"),
1220 "expected camelCase 'sslMode'"
1221 );
1222
1223 assert!(
1225 !props.contains_key("slot_name"),
1226 "should not have snake_case 'slot_name'"
1227 );
1228 assert!(
1229 !props.contains_key("publication_name"),
1230 "should not have snake_case 'publication_name'"
1231 );
1232
1233 assert_eq!(
1235 props.get("host").and_then(|v| v.as_str()),
1236 Some("myhost.example.com")
1237 );
1238 assert_eq!(props.get("port").and_then(|v| v.as_u64()), Some(5433));
1239 assert_eq!(props.get("database").and_then(|v| v.as_str()), Some("mydb"));
1240 assert_eq!(
1241 props.get("password").and_then(|v| v.as_str()),
1242 Some("secret123")
1243 );
1244 }
1245 }
1246
1247 mod builder {
1248 use super::*;
1249
1250 #[test]
1251 fn test_postgres_builder_defaults() {
1252 let source = PostgresSourceBuilder::new("test").build().unwrap();
1253 assert_eq!(source.config.host, "localhost");
1254 assert_eq!(source.config.port, 5432);
1255 assert_eq!(source.config.slot_name, "drasi_slot");
1256 assert_eq!(source.config.publication_name, "drasi_publication");
1257 }
1258
1259 #[test]
1260 fn test_postgres_builder_custom_values() {
1261 let source = PostgresSourceBuilder::new("test")
1262 .with_host("db.example.com")
1263 .with_port(5433)
1264 .with_database("production")
1265 .with_user("app_user")
1266 .with_password("secret")
1267 .with_tables(vec!["users".to_string(), "orders".to_string()])
1268 .build()
1269 .unwrap();
1270
1271 assert_eq!(source.config.host, "db.example.com");
1272 assert_eq!(source.config.port, 5433);
1273 assert_eq!(source.config.database, "production");
1274 assert_eq!(source.config.user, "app_user");
1275 assert_eq!(source.config.password, "secret");
1276 assert_eq!(source.config.tables.len(), 2);
1277 assert_eq!(source.config.tables[0], "users");
1278 assert_eq!(source.config.tables[1], "orders");
1279 }
1280
1281 #[test]
1282 fn test_builder_add_table() {
1283 let source = PostgresSourceBuilder::new("test")
1284 .add_table("table1")
1285 .add_table("table2")
1286 .add_table("table3")
1287 .build()
1288 .unwrap();
1289
1290 assert_eq!(source.config.tables.len(), 3);
1291 assert_eq!(source.config.tables[0], "table1");
1292 assert_eq!(source.config.tables[1], "table2");
1293 assert_eq!(source.config.tables[2], "table3");
1294 }
1295
1296 #[test]
1297 fn test_builder_slot_and_publication() {
1298 let source = PostgresSourceBuilder::new("test")
1299 .with_slot_name("custom_slot")
1300 .with_publication_name("custom_pub")
1301 .build()
1302 .unwrap();
1303
1304 assert_eq!(source.config.slot_name, "custom_slot");
1305 assert_eq!(source.config.publication_name, "custom_pub");
1306 }
1307
1308 #[test]
1309 fn test_builder_id() {
1310 let source = PostgresReplicationSource::builder("my-pg-source")
1311 .with_database("db")
1312 .with_user("user")
1313 .build()
1314 .unwrap();
1315
1316 assert_eq!(source.base.id, "my-pg-source");
1317 }
1318 }
1319
1320 mod config {
1321 use super::*;
1322
1323 #[test]
1324 fn test_config_serialization() {
1325 let config = PostgresSourceConfig {
1326 host: "localhost".to_string(),
1327 port: 5432,
1328 database: "testdb".to_string(),
1329 user: "testuser".to_string(),
1330 password: String::new(),
1331 tables: Vec::new(),
1332 slot_name: "drasi_slot".to_string(),
1333 publication_name: "drasi_publication".to_string(),
1334 ssl_mode: SslMode::default(),
1335 table_keys: Vec::new(),
1336 };
1337
1338 let json = serde_json::to_string(&config).unwrap();
1339 let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
1340
1341 assert_eq!(config, deserialized);
1342 }
1343
1344 #[test]
1345 fn test_config_deserialization_with_required_fields() {
1346 let json = r#"{
1347 "database": "mydb",
1348 "user": "myuser"
1349 }"#;
1350 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1351
1352 assert_eq!(config.database, "mydb");
1353 assert_eq!(config.user, "myuser");
1354 assert_eq!(config.host, "localhost"); assert_eq!(config.port, 5432); assert_eq!(config.slot_name, "drasi_slot"); }
1358
1359 #[test]
1360 fn test_config_deserialization_full() {
1361 let json = r#"{
1362 "host": "db.prod.internal",
1363 "port": 5433,
1364 "database": "production",
1365 "user": "replication_user",
1366 "password": "secret",
1367 "tables": ["accounts", "transactions"],
1368 "slot_name": "prod_slot",
1369 "publication_name": "prod_publication"
1370 }"#;
1371 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
1372
1373 assert_eq!(config.host, "db.prod.internal");
1374 assert_eq!(config.port, 5433);
1375 assert_eq!(config.database, "production");
1376 assert_eq!(config.user, "replication_user");
1377 assert_eq!(config.password, "secret");
1378 assert_eq!(config.tables, vec!["accounts", "transactions"]);
1379 assert_eq!(config.slot_name, "prod_slot");
1380 assert_eq!(config.publication_name, "prod_publication");
1381 }
1382 }
1383}
1384
1385#[cfg(feature = "dynamic-plugin")]
1389drasi_plugin_sdk::export_plugin!(
1390 plugin_id = "postgres-source",
1391 core_version = env!("CARGO_PKG_VERSION"),
1392 lib_version = env!("CARGO_PKG_VERSION"),
1393 plugin_version = env!("CARGO_PKG_VERSION"),
1394 source_descriptors = [descriptor::PostgresSourceDescriptor],
1395 reaction_descriptors = [],
1396 bootstrap_descriptors = [],
1397);