1#![allow(unexpected_cfgs)]
2pub mod config;
172pub mod connection;
173pub mod decoder;
174pub mod descriptor;
175pub mod protocol;
176pub mod scram;
177pub mod stream;
178pub mod types;
179
180pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
181
182use anyhow::Result;
183use async_trait::async_trait;
184use log::{error, info};
185use std::collections::HashMap;
186use std::sync::Arc;
187use tokio::sync::RwLock;
188
189use drasi_lib::channels::{DispatchMode, *};
190use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
191use drasi_lib::Source;
192use tracing::Instrument;
193
194pub struct PostgresReplicationSource {
204 base: SourceBase,
206 config: PostgresSourceConfig,
208}
209
210impl PostgresReplicationSource {
211 pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
228 PostgresSourceBuilder::new(id)
229 }
230
231 pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
263 let id = id.into();
264 let params = SourceBaseParams::new(id);
265 Ok(Self {
266 base: SourceBase::new(params)?,
267 config,
268 })
269 }
270
271 pub fn with_dispatch(
276 id: impl Into<String>,
277 config: PostgresSourceConfig,
278 dispatch_mode: Option<DispatchMode>,
279 dispatch_buffer_capacity: Option<usize>,
280 ) -> Result<Self> {
281 let id = id.into();
282 let mut params = SourceBaseParams::new(id);
283 if let Some(mode) = dispatch_mode {
284 params = params.with_dispatch_mode(mode);
285 }
286 if let Some(capacity) = dispatch_buffer_capacity {
287 params = params.with_dispatch_buffer_capacity(capacity);
288 }
289 Ok(Self {
290 base: SourceBase::new(params)?,
291 config,
292 })
293 }
294}
295
296#[async_trait]
297impl Source for PostgresReplicationSource {
298 fn id(&self) -> &str {
299 &self.base.id
300 }
301
302 fn type_name(&self) -> &str {
303 "postgres"
304 }
305
306 fn properties(&self) -> HashMap<String, serde_json::Value> {
307 let mut props = HashMap::new();
308 props.insert(
309 "host".to_string(),
310 serde_json::Value::String(self.config.host.clone()),
311 );
312 props.insert(
313 "port".to_string(),
314 serde_json::Value::Number(self.config.port.into()),
315 );
316 props.insert(
317 "database".to_string(),
318 serde_json::Value::String(self.config.database.clone()),
319 );
320 props.insert(
321 "user".to_string(),
322 serde_json::Value::String(self.config.user.clone()),
323 );
324 props.insert(
326 "tables".to_string(),
327 serde_json::Value::Array(
328 self.config
329 .tables
330 .iter()
331 .map(|t| serde_json::Value::String(t.clone()))
332 .collect(),
333 ),
334 );
335 props
336 }
337
338 fn auto_start(&self) -> bool {
339 self.base.get_auto_start()
340 }
341
342 async fn start(&self) -> Result<()> {
343 if self.base.get_status().await == ComponentStatus::Running {
344 return Ok(());
345 }
346
347 self.base.set_status(ComponentStatus::Starting).await;
348 info!("Starting PostgreSQL replication source: {}", self.base.id);
349
350 let config = self.config.clone();
351 let source_id = self.base.id.clone();
352 let dispatchers = self.base.dispatchers.clone();
353 let status_tx = self.base.status_tx();
354 let status_clone = self.base.status.clone();
355
356 let instance_id = self
358 .base
359 .context()
360 .await
361 .map(|c| c.instance_id)
362 .unwrap_or_default();
363
364 let source_id_for_span = source_id.clone();
366 let span = tracing::info_span!(
367 "postgres_replication_task",
368 instance_id = %instance_id,
369 component_id = %source_id_for_span,
370 component_type = "source"
371 );
372
373 let task = tokio::spawn(
374 async move {
375 if let Err(e) = run_replication(
376 source_id.clone(),
377 config,
378 dispatchers,
379 status_tx.clone(),
380 status_clone.clone(),
381 )
382 .await
383 {
384 error!("Replication task failed for {source_id}: {e}");
385 *status_clone.write().await = ComponentStatus::Error;
386 if let Some(ref tx) = *status_tx.read().await {
387 let _ = tx
388 .send(ComponentEvent {
389 component_id: source_id,
390 component_type: ComponentType::Source,
391 status: ComponentStatus::Error,
392 timestamp: chrono::Utc::now(),
393 message: Some(format!("Replication failed: {e}")),
394 })
395 .await;
396 }
397 }
398 }
399 .instrument(span),
400 );
401
402 *self.base.task_handle.write().await = Some(task);
403 self.base.set_status(ComponentStatus::Running).await;
404
405 self.base
406 .send_component_event(
407 ComponentStatus::Running,
408 Some("PostgreSQL replication started".to_string()),
409 )
410 .await?;
411
412 Ok(())
413 }
414
415 async fn stop(&self) -> Result<()> {
416 if self.base.get_status().await != ComponentStatus::Running {
417 return Ok(());
418 }
419
420 info!("Stopping PostgreSQL replication source: {}", self.base.id);
421
422 self.base.set_status(ComponentStatus::Stopping).await;
423
424 if let Some(task) = self.base.task_handle.write().await.take() {
426 task.abort();
427 }
428
429 self.base.set_status(ComponentStatus::Stopped).await;
430 self.base
431 .send_component_event(
432 ComponentStatus::Stopped,
433 Some("PostgreSQL replication stopped".to_string()),
434 )
435 .await?;
436
437 Ok(())
438 }
439
440 async fn status(&self) -> ComponentStatus {
441 self.base.get_status().await
442 }
443
444 async fn subscribe(
445 &self,
446 settings: drasi_lib::config::SourceSubscriptionSettings,
447 ) -> Result<SubscriptionResponse> {
448 self.base
449 .subscribe_with_bootstrap(&settings, "PostgreSQL")
450 .await
451 }
452
453 fn as_any(&self) -> &dyn std::any::Any {
454 self
455 }
456
457 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
458 self.base.initialize(context).await;
459 }
460
461 async fn set_bootstrap_provider(
462 &self,
463 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
464 ) {
465 self.base.set_bootstrap_provider(provider).await;
466 }
467}
468
469async fn run_replication(
470 source_id: String,
471 config: PostgresSourceConfig,
472 dispatchers: Arc<
473 RwLock<
474 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
475 >,
476 >,
477 status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
478 status: Arc<RwLock<ComponentStatus>>,
479) -> Result<()> {
480 info!("Starting replication for source {source_id}");
481
482 let mut stream =
483 stream::ReplicationStream::new(config, source_id, dispatchers, status_tx, status);
484
485 stream.run().await
486}
487
488pub struct PostgresSourceBuilder {
508 id: String,
509 host: String,
510 port: u16,
511 database: String,
512 user: String,
513 password: String,
514 tables: Vec<String>,
515 slot_name: String,
516 publication_name: String,
517 ssl_mode: SslMode,
518 table_keys: Vec<TableKeyConfig>,
519 dispatch_mode: Option<DispatchMode>,
520 dispatch_buffer_capacity: Option<usize>,
521 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
522 auto_start: bool,
523}
524
525impl PostgresSourceBuilder {
526 pub fn new(id: impl Into<String>) -> Self {
528 Self {
529 id: id.into(),
530 host: "localhost".to_string(),
531 port: 5432,
532 database: String::new(),
533 user: String::new(),
534 password: String::new(),
535 tables: Vec::new(),
536 slot_name: "drasi_slot".to_string(),
537 publication_name: "drasi_publication".to_string(),
538 ssl_mode: SslMode::default(),
539 table_keys: Vec::new(),
540 dispatch_mode: None,
541 dispatch_buffer_capacity: None,
542 bootstrap_provider: None,
543 auto_start: true,
544 }
545 }
546
547 pub fn with_host(mut self, host: impl Into<String>) -> Self {
549 self.host = host.into();
550 self
551 }
552
553 pub fn with_port(mut self, port: u16) -> Self {
555 self.port = port;
556 self
557 }
558
559 pub fn with_database(mut self, database: impl Into<String>) -> Self {
561 self.database = database.into();
562 self
563 }
564
565 pub fn with_user(mut self, user: impl Into<String>) -> Self {
567 self.user = user.into();
568 self
569 }
570
571 pub fn with_password(mut self, password: impl Into<String>) -> Self {
573 self.password = password.into();
574 self
575 }
576
577 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
579 self.tables = tables;
580 self
581 }
582
583 pub fn add_table(mut self, table: impl Into<String>) -> Self {
585 self.tables.push(table.into());
586 self
587 }
588
589 pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
591 self.slot_name = slot_name.into();
592 self
593 }
594
595 pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
597 self.publication_name = publication_name.into();
598 self
599 }
600
601 pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
603 self.ssl_mode = ssl_mode;
604 self
605 }
606
607 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
609 self.table_keys = table_keys;
610 self
611 }
612
613 pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
615 self.table_keys.push(table_key);
616 self
617 }
618
619 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
621 self.dispatch_mode = Some(mode);
622 self
623 }
624
625 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
627 self.dispatch_buffer_capacity = Some(capacity);
628 self
629 }
630
631 pub fn with_bootstrap_provider(
633 mut self,
634 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
635 ) -> Self {
636 self.bootstrap_provider = Some(Box::new(provider));
637 self
638 }
639
640 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
645 self.auto_start = auto_start;
646 self
647 }
648
649 pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
651 self.host = config.host;
652 self.port = config.port;
653 self.database = config.database;
654 self.user = config.user;
655 self.password = config.password;
656 self.tables = config.tables;
657 self.slot_name = config.slot_name;
658 self.publication_name = config.publication_name;
659 self.ssl_mode = config.ssl_mode;
660 self.table_keys = config.table_keys;
661 self
662 }
663
664 pub fn build(self) -> Result<PostgresReplicationSource> {
670 let config = PostgresSourceConfig {
671 host: self.host,
672 port: self.port,
673 database: self.database,
674 user: self.user,
675 password: self.password,
676 tables: self.tables,
677 slot_name: self.slot_name,
678 publication_name: self.publication_name,
679 ssl_mode: self.ssl_mode,
680 table_keys: self.table_keys,
681 };
682
683 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
684 if let Some(mode) = self.dispatch_mode {
685 params = params.with_dispatch_mode(mode);
686 }
687 if let Some(capacity) = self.dispatch_buffer_capacity {
688 params = params.with_dispatch_buffer_capacity(capacity);
689 }
690 if let Some(provider) = self.bootstrap_provider {
691 params = params.with_bootstrap_provider(provider);
692 }
693
694 Ok(PostgresReplicationSource {
695 base: SourceBase::new(params)?,
696 config,
697 })
698 }
699}
700
701#[cfg(test)]
702mod tests {
703 use super::*;
704
705 mod construction {
706 use super::*;
707
708 #[test]
709 fn test_builder_with_valid_config() {
710 let source = PostgresSourceBuilder::new("test-source")
711 .with_database("testdb")
712 .with_user("testuser")
713 .build();
714 assert!(source.is_ok());
715 }
716
717 #[test]
718 fn test_builder_with_custom_config() {
719 let source = PostgresSourceBuilder::new("pg-source")
720 .with_host("192.168.1.100")
721 .with_port(5433)
722 .with_database("production")
723 .with_user("admin")
724 .with_password("secret")
725 .build()
726 .unwrap();
727 assert_eq!(source.id(), "pg-source");
728 }
729
730 #[test]
731 fn test_with_dispatch_creates_source() {
732 let config = PostgresSourceConfig {
733 host: "localhost".to_string(),
734 port: 5432,
735 database: "testdb".to_string(),
736 user: "testuser".to_string(),
737 password: String::new(),
738 tables: Vec::new(),
739 slot_name: "drasi_slot".to_string(),
740 publication_name: "drasi_publication".to_string(),
741 ssl_mode: SslMode::default(),
742 table_keys: Vec::new(),
743 };
744 let source = PostgresReplicationSource::with_dispatch(
745 "dispatch-source",
746 config,
747 Some(DispatchMode::Channel),
748 Some(2000),
749 );
750 assert!(source.is_ok());
751 assert_eq!(source.unwrap().id(), "dispatch-source");
752 }
753 }
754
755 mod properties {
756 use super::*;
757
758 #[test]
759 fn test_id_returns_correct_value() {
760 let source = PostgresSourceBuilder::new("my-pg-source")
761 .with_database("db")
762 .with_user("user")
763 .build()
764 .unwrap();
765 assert_eq!(source.id(), "my-pg-source");
766 }
767
768 #[test]
769 fn test_type_name_returns_postgres() {
770 let source = PostgresSourceBuilder::new("test")
771 .with_database("db")
772 .with_user("user")
773 .build()
774 .unwrap();
775 assert_eq!(source.type_name(), "postgres");
776 }
777
778 #[test]
779 fn test_properties_contains_connection_info() {
780 let source = PostgresSourceBuilder::new("test")
781 .with_host("db.example.com")
782 .with_port(5433)
783 .with_database("mydb")
784 .with_user("app_user")
785 .with_password("secret")
786 .with_tables(vec!["users".to_string()])
787 .build()
788 .unwrap();
789 let props = source.properties();
790
791 assert_eq!(
792 props.get("host"),
793 Some(&serde_json::Value::String("db.example.com".to_string()))
794 );
795 assert_eq!(
796 props.get("port"),
797 Some(&serde_json::Value::Number(5433.into()))
798 );
799 assert_eq!(
800 props.get("database"),
801 Some(&serde_json::Value::String("mydb".to_string()))
802 );
803 assert_eq!(
804 props.get("user"),
805 Some(&serde_json::Value::String("app_user".to_string()))
806 );
807 }
808
809 #[test]
810 fn test_properties_does_not_expose_password() {
811 let source = PostgresSourceBuilder::new("test")
812 .with_database("db")
813 .with_user("user")
814 .with_password("super_secret_password")
815 .build()
816 .unwrap();
817 let props = source.properties();
818
819 assert!(!props.contains_key("password"));
821 }
822
823 #[test]
824 fn test_properties_includes_tables() {
825 let source = PostgresSourceBuilder::new("test")
826 .with_database("db")
827 .with_user("user")
828 .with_tables(vec!["users".to_string(), "orders".to_string()])
829 .build()
830 .unwrap();
831 let props = source.properties();
832
833 let tables = props.get("tables").unwrap().as_array().unwrap();
834 assert_eq!(tables.len(), 2);
835 assert_eq!(tables[0], "users");
836 assert_eq!(tables[1], "orders");
837 }
838 }
839
840 mod lifecycle {
841 use super::*;
842
843 #[tokio::test]
844 async fn test_initial_status_is_stopped() {
845 let source = PostgresSourceBuilder::new("test")
846 .with_database("db")
847 .with_user("user")
848 .build()
849 .unwrap();
850 assert_eq!(source.status().await, ComponentStatus::Stopped);
851 }
852 }
853
854 mod builder {
855 use super::*;
856
857 #[test]
858 fn test_postgres_builder_defaults() {
859 let source = PostgresSourceBuilder::new("test").build().unwrap();
860 assert_eq!(source.config.host, "localhost");
861 assert_eq!(source.config.port, 5432);
862 assert_eq!(source.config.slot_name, "drasi_slot");
863 assert_eq!(source.config.publication_name, "drasi_publication");
864 }
865
866 #[test]
867 fn test_postgres_builder_custom_values() {
868 let source = PostgresSourceBuilder::new("test")
869 .with_host("db.example.com")
870 .with_port(5433)
871 .with_database("production")
872 .with_user("app_user")
873 .with_password("secret")
874 .with_tables(vec!["users".to_string(), "orders".to_string()])
875 .build()
876 .unwrap();
877
878 assert_eq!(source.config.host, "db.example.com");
879 assert_eq!(source.config.port, 5433);
880 assert_eq!(source.config.database, "production");
881 assert_eq!(source.config.user, "app_user");
882 assert_eq!(source.config.password, "secret");
883 assert_eq!(source.config.tables.len(), 2);
884 assert_eq!(source.config.tables[0], "users");
885 assert_eq!(source.config.tables[1], "orders");
886 }
887
888 #[test]
889 fn test_builder_add_table() {
890 let source = PostgresSourceBuilder::new("test")
891 .add_table("table1")
892 .add_table("table2")
893 .add_table("table3")
894 .build()
895 .unwrap();
896
897 assert_eq!(source.config.tables.len(), 3);
898 assert_eq!(source.config.tables[0], "table1");
899 assert_eq!(source.config.tables[1], "table2");
900 assert_eq!(source.config.tables[2], "table3");
901 }
902
903 #[test]
904 fn test_builder_slot_and_publication() {
905 let source = PostgresSourceBuilder::new("test")
906 .with_slot_name("custom_slot")
907 .with_publication_name("custom_pub")
908 .build()
909 .unwrap();
910
911 assert_eq!(source.config.slot_name, "custom_slot");
912 assert_eq!(source.config.publication_name, "custom_pub");
913 }
914
915 #[test]
916 fn test_builder_id() {
917 let source = PostgresReplicationSource::builder("my-pg-source")
918 .with_database("db")
919 .with_user("user")
920 .build()
921 .unwrap();
922
923 assert_eq!(source.base.id, "my-pg-source");
924 }
925 }
926
927 mod config {
928 use super::*;
929
930 #[test]
931 fn test_config_serialization() {
932 let config = PostgresSourceConfig {
933 host: "localhost".to_string(),
934 port: 5432,
935 database: "testdb".to_string(),
936 user: "testuser".to_string(),
937 password: String::new(),
938 tables: Vec::new(),
939 slot_name: "drasi_slot".to_string(),
940 publication_name: "drasi_publication".to_string(),
941 ssl_mode: SslMode::default(),
942 table_keys: Vec::new(),
943 };
944
945 let json = serde_json::to_string(&config).unwrap();
946 let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
947
948 assert_eq!(config, deserialized);
949 }
950
951 #[test]
952 fn test_config_deserialization_with_required_fields() {
953 let json = r#"{
954 "database": "mydb",
955 "user": "myuser"
956 }"#;
957 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
958
959 assert_eq!(config.database, "mydb");
960 assert_eq!(config.user, "myuser");
961 assert_eq!(config.host, "localhost"); assert_eq!(config.port, 5432); assert_eq!(config.slot_name, "drasi_slot"); }
965
966 #[test]
967 fn test_config_deserialization_full() {
968 let json = r#"{
969 "host": "db.prod.internal",
970 "port": 5433,
971 "database": "production",
972 "user": "replication_user",
973 "password": "secret",
974 "tables": ["accounts", "transactions"],
975 "slot_name": "prod_slot",
976 "publication_name": "prod_publication"
977 }"#;
978 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
979
980 assert_eq!(config.host, "db.prod.internal");
981 assert_eq!(config.port, 5433);
982 assert_eq!(config.database, "production");
983 assert_eq!(config.user, "replication_user");
984 assert_eq!(config.password, "secret");
985 assert_eq!(config.tables, vec!["accounts", "transactions"]);
986 assert_eq!(config.slot_name, "prod_slot");
987 assert_eq!(config.publication_name, "prod_publication");
988 }
989 }
990}
991
992#[cfg(feature = "dynamic-plugin")]
996drasi_plugin_sdk::export_plugin!(
997 plugin_id = "postgres-source",
998 core_version = env!("CARGO_PKG_VERSION"),
999 lib_version = env!("CARGO_PKG_VERSION"),
1000 plugin_version = env!("CARGO_PKG_VERSION"),
1001 source_descriptors = [descriptor::PostgresSourceDescriptor],
1002 reaction_descriptors = [],
1003 bootstrap_descriptors = [],
1004);