1pub mod config;
171pub mod connection;
172pub mod decoder;
173pub mod protocol;
174pub mod scram;
175pub mod stream;
176pub mod types;
177
178pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
179
180use anyhow::Result;
181use async_trait::async_trait;
182use log::{error, info};
183use std::collections::HashMap;
184use std::sync::Arc;
185use tokio::sync::RwLock;
186
187use drasi_lib::channels::{DispatchMode, *};
188use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
189use drasi_lib::Source;
190use tracing::Instrument;
191
192pub struct PostgresReplicationSource {
202 base: SourceBase,
204 config: PostgresSourceConfig,
206}
207
208impl PostgresReplicationSource {
209 pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
226 PostgresSourceBuilder::new(id)
227 }
228
229 pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
261 let id = id.into();
262 let params = SourceBaseParams::new(id);
263 Ok(Self {
264 base: SourceBase::new(params)?,
265 config,
266 })
267 }
268
269 pub fn with_dispatch(
274 id: impl Into<String>,
275 config: PostgresSourceConfig,
276 dispatch_mode: Option<DispatchMode>,
277 dispatch_buffer_capacity: Option<usize>,
278 ) -> Result<Self> {
279 let id = id.into();
280 let mut params = SourceBaseParams::new(id);
281 if let Some(mode) = dispatch_mode {
282 params = params.with_dispatch_mode(mode);
283 }
284 if let Some(capacity) = dispatch_buffer_capacity {
285 params = params.with_dispatch_buffer_capacity(capacity);
286 }
287 Ok(Self {
288 base: SourceBase::new(params)?,
289 config,
290 })
291 }
292}
293
294#[async_trait]
295impl Source for PostgresReplicationSource {
296 fn id(&self) -> &str {
297 &self.base.id
298 }
299
300 fn type_name(&self) -> &str {
301 "postgres"
302 }
303
304 fn properties(&self) -> HashMap<String, serde_json::Value> {
305 let mut props = HashMap::new();
306 props.insert(
307 "host".to_string(),
308 serde_json::Value::String(self.config.host.clone()),
309 );
310 props.insert(
311 "port".to_string(),
312 serde_json::Value::Number(self.config.port.into()),
313 );
314 props.insert(
315 "database".to_string(),
316 serde_json::Value::String(self.config.database.clone()),
317 );
318 props.insert(
319 "user".to_string(),
320 serde_json::Value::String(self.config.user.clone()),
321 );
322 props.insert(
324 "tables".to_string(),
325 serde_json::Value::Array(
326 self.config
327 .tables
328 .iter()
329 .map(|t| serde_json::Value::String(t.clone()))
330 .collect(),
331 ),
332 );
333 props
334 }
335
336 fn auto_start(&self) -> bool {
337 self.base.get_auto_start()
338 }
339
340 async fn start(&self) -> Result<()> {
341 if self.base.get_status().await == ComponentStatus::Running {
342 return Ok(());
343 }
344
345 self.base.set_status(ComponentStatus::Starting).await;
346 info!("Starting PostgreSQL replication source: {}", self.base.id);
347
348 let config = self.config.clone();
349 let source_id = self.base.id.clone();
350 let dispatchers = self.base.dispatchers.clone();
351 let status_tx = self.base.status_tx();
352 let status_clone = self.base.status.clone();
353
354 let instance_id = self
356 .base
357 .context()
358 .await
359 .map(|c| c.instance_id)
360 .unwrap_or_default();
361
362 let source_id_for_span = source_id.clone();
364 let span = tracing::info_span!(
365 "postgres_replication_task",
366 instance_id = %instance_id,
367 component_id = %source_id_for_span,
368 component_type = "source"
369 );
370
371 let task = tokio::spawn(
372 async move {
373 if let Err(e) = run_replication(
374 source_id.clone(),
375 config,
376 dispatchers,
377 status_tx.clone(),
378 status_clone.clone(),
379 )
380 .await
381 {
382 error!("Replication task failed for {source_id}: {e}");
383 *status_clone.write().await = ComponentStatus::Error;
384 if let Some(ref tx) = *status_tx.read().await {
385 let _ = tx
386 .send(ComponentEvent {
387 component_id: source_id,
388 component_type: ComponentType::Source,
389 status: ComponentStatus::Error,
390 timestamp: chrono::Utc::now(),
391 message: Some(format!("Replication failed: {e}")),
392 })
393 .await;
394 }
395 }
396 }
397 .instrument(span),
398 );
399
400 *self.base.task_handle.write().await = Some(task);
401 self.base.set_status(ComponentStatus::Running).await;
402
403 self.base
404 .send_component_event(
405 ComponentStatus::Running,
406 Some("PostgreSQL replication started".to_string()),
407 )
408 .await?;
409
410 Ok(())
411 }
412
413 async fn stop(&self) -> Result<()> {
414 if self.base.get_status().await != ComponentStatus::Running {
415 return Ok(());
416 }
417
418 info!("Stopping PostgreSQL replication source: {}", self.base.id);
419
420 self.base.set_status(ComponentStatus::Stopping).await;
421
422 if let Some(task) = self.base.task_handle.write().await.take() {
424 task.abort();
425 }
426
427 self.base.set_status(ComponentStatus::Stopped).await;
428 self.base
429 .send_component_event(
430 ComponentStatus::Stopped,
431 Some("PostgreSQL replication stopped".to_string()),
432 )
433 .await?;
434
435 Ok(())
436 }
437
438 async fn status(&self) -> ComponentStatus {
439 self.base.get_status().await
440 }
441
442 async fn subscribe(
443 &self,
444 settings: drasi_lib::config::SourceSubscriptionSettings,
445 ) -> Result<SubscriptionResponse> {
446 self.base
447 .subscribe_with_bootstrap(&settings, "PostgreSQL")
448 .await
449 }
450
451 fn as_any(&self) -> &dyn std::any::Any {
452 self
453 }
454
455 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
456 self.base.initialize(context).await;
457 }
458
459 async fn set_bootstrap_provider(
460 &self,
461 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
462 ) {
463 self.base.set_bootstrap_provider(provider).await;
464 }
465}
466
467async fn run_replication(
468 source_id: String,
469 config: PostgresSourceConfig,
470 dispatchers: Arc<
471 RwLock<
472 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
473 >,
474 >,
475 status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
476 status: Arc<RwLock<ComponentStatus>>,
477) -> Result<()> {
478 info!("Starting replication for source {source_id}");
479
480 let mut stream =
481 stream::ReplicationStream::new(config, source_id, dispatchers, status_tx, status);
482
483 stream.run().await
484}
485
486pub struct PostgresSourceBuilder {
506 id: String,
507 host: String,
508 port: u16,
509 database: String,
510 user: String,
511 password: String,
512 tables: Vec<String>,
513 slot_name: String,
514 publication_name: String,
515 ssl_mode: SslMode,
516 table_keys: Vec<TableKeyConfig>,
517 dispatch_mode: Option<DispatchMode>,
518 dispatch_buffer_capacity: Option<usize>,
519 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
520 auto_start: bool,
521}
522
523impl PostgresSourceBuilder {
524 pub fn new(id: impl Into<String>) -> Self {
526 Self {
527 id: id.into(),
528 host: "localhost".to_string(),
529 port: 5432,
530 database: String::new(),
531 user: String::new(),
532 password: String::new(),
533 tables: Vec::new(),
534 slot_name: "drasi_slot".to_string(),
535 publication_name: "drasi_publication".to_string(),
536 ssl_mode: SslMode::default(),
537 table_keys: Vec::new(),
538 dispatch_mode: None,
539 dispatch_buffer_capacity: None,
540 bootstrap_provider: None,
541 auto_start: true,
542 }
543 }
544
545 pub fn with_host(mut self, host: impl Into<String>) -> Self {
547 self.host = host.into();
548 self
549 }
550
551 pub fn with_port(mut self, port: u16) -> Self {
553 self.port = port;
554 self
555 }
556
557 pub fn with_database(mut self, database: impl Into<String>) -> Self {
559 self.database = database.into();
560 self
561 }
562
563 pub fn with_user(mut self, user: impl Into<String>) -> Self {
565 self.user = user.into();
566 self
567 }
568
569 pub fn with_password(mut self, password: impl Into<String>) -> Self {
571 self.password = password.into();
572 self
573 }
574
575 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
577 self.tables = tables;
578 self
579 }
580
581 pub fn add_table(mut self, table: impl Into<String>) -> Self {
583 self.tables.push(table.into());
584 self
585 }
586
587 pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
589 self.slot_name = slot_name.into();
590 self
591 }
592
593 pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
595 self.publication_name = publication_name.into();
596 self
597 }
598
599 pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
601 self.ssl_mode = ssl_mode;
602 self
603 }
604
605 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
607 self.table_keys = table_keys;
608 self
609 }
610
611 pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
613 self.table_keys.push(table_key);
614 self
615 }
616
617 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
619 self.dispatch_mode = Some(mode);
620 self
621 }
622
623 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
625 self.dispatch_buffer_capacity = Some(capacity);
626 self
627 }
628
629 pub fn with_bootstrap_provider(
631 mut self,
632 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
633 ) -> Self {
634 self.bootstrap_provider = Some(Box::new(provider));
635 self
636 }
637
638 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
643 self.auto_start = auto_start;
644 self
645 }
646
647 pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
649 self.host = config.host;
650 self.port = config.port;
651 self.database = config.database;
652 self.user = config.user;
653 self.password = config.password;
654 self.tables = config.tables;
655 self.slot_name = config.slot_name;
656 self.publication_name = config.publication_name;
657 self.ssl_mode = config.ssl_mode;
658 self.table_keys = config.table_keys;
659 self
660 }
661
662 pub fn build(self) -> Result<PostgresReplicationSource> {
668 let config = PostgresSourceConfig {
669 host: self.host,
670 port: self.port,
671 database: self.database,
672 user: self.user,
673 password: self.password,
674 tables: self.tables,
675 slot_name: self.slot_name,
676 publication_name: self.publication_name,
677 ssl_mode: self.ssl_mode,
678 table_keys: self.table_keys,
679 };
680
681 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
682 if let Some(mode) = self.dispatch_mode {
683 params = params.with_dispatch_mode(mode);
684 }
685 if let Some(capacity) = self.dispatch_buffer_capacity {
686 params = params.with_dispatch_buffer_capacity(capacity);
687 }
688 if let Some(provider) = self.bootstrap_provider {
689 params = params.with_bootstrap_provider(provider);
690 }
691
692 Ok(PostgresReplicationSource {
693 base: SourceBase::new(params)?,
694 config,
695 })
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702
703 mod construction {
704 use super::*;
705
706 #[test]
707 fn test_builder_with_valid_config() {
708 let source = PostgresSourceBuilder::new("test-source")
709 .with_database("testdb")
710 .with_user("testuser")
711 .build();
712 assert!(source.is_ok());
713 }
714
715 #[test]
716 fn test_builder_with_custom_config() {
717 let source = PostgresSourceBuilder::new("pg-source")
718 .with_host("192.168.1.100")
719 .with_port(5433)
720 .with_database("production")
721 .with_user("admin")
722 .with_password("secret")
723 .build()
724 .unwrap();
725 assert_eq!(source.id(), "pg-source");
726 }
727
728 #[test]
729 fn test_with_dispatch_creates_source() {
730 let config = PostgresSourceConfig {
731 host: "localhost".to_string(),
732 port: 5432,
733 database: "testdb".to_string(),
734 user: "testuser".to_string(),
735 password: String::new(),
736 tables: Vec::new(),
737 slot_name: "drasi_slot".to_string(),
738 publication_name: "drasi_publication".to_string(),
739 ssl_mode: SslMode::default(),
740 table_keys: Vec::new(),
741 };
742 let source = PostgresReplicationSource::with_dispatch(
743 "dispatch-source",
744 config,
745 Some(DispatchMode::Channel),
746 Some(2000),
747 );
748 assert!(source.is_ok());
749 assert_eq!(source.unwrap().id(), "dispatch-source");
750 }
751 }
752
753 mod properties {
754 use super::*;
755
756 #[test]
757 fn test_id_returns_correct_value() {
758 let source = PostgresSourceBuilder::new("my-pg-source")
759 .with_database("db")
760 .with_user("user")
761 .build()
762 .unwrap();
763 assert_eq!(source.id(), "my-pg-source");
764 }
765
766 #[test]
767 fn test_type_name_returns_postgres() {
768 let source = PostgresSourceBuilder::new("test")
769 .with_database("db")
770 .with_user("user")
771 .build()
772 .unwrap();
773 assert_eq!(source.type_name(), "postgres");
774 }
775
776 #[test]
777 fn test_properties_contains_connection_info() {
778 let source = PostgresSourceBuilder::new("test")
779 .with_host("db.example.com")
780 .with_port(5433)
781 .with_database("mydb")
782 .with_user("app_user")
783 .with_password("secret")
784 .with_tables(vec!["users".to_string()])
785 .build()
786 .unwrap();
787 let props = source.properties();
788
789 assert_eq!(
790 props.get("host"),
791 Some(&serde_json::Value::String("db.example.com".to_string()))
792 );
793 assert_eq!(
794 props.get("port"),
795 Some(&serde_json::Value::Number(5433.into()))
796 );
797 assert_eq!(
798 props.get("database"),
799 Some(&serde_json::Value::String("mydb".to_string()))
800 );
801 assert_eq!(
802 props.get("user"),
803 Some(&serde_json::Value::String("app_user".to_string()))
804 );
805 }
806
807 #[test]
808 fn test_properties_does_not_expose_password() {
809 let source = PostgresSourceBuilder::new("test")
810 .with_database("db")
811 .with_user("user")
812 .with_password("super_secret_password")
813 .build()
814 .unwrap();
815 let props = source.properties();
816
817 assert!(!props.contains_key("password"));
819 }
820
821 #[test]
822 fn test_properties_includes_tables() {
823 let source = PostgresSourceBuilder::new("test")
824 .with_database("db")
825 .with_user("user")
826 .with_tables(vec!["users".to_string(), "orders".to_string()])
827 .build()
828 .unwrap();
829 let props = source.properties();
830
831 let tables = props.get("tables").unwrap().as_array().unwrap();
832 assert_eq!(tables.len(), 2);
833 assert_eq!(tables[0], "users");
834 assert_eq!(tables[1], "orders");
835 }
836 }
837
838 mod lifecycle {
839 use super::*;
840
841 #[tokio::test]
842 async fn test_initial_status_is_stopped() {
843 let source = PostgresSourceBuilder::new("test")
844 .with_database("db")
845 .with_user("user")
846 .build()
847 .unwrap();
848 assert_eq!(source.status().await, ComponentStatus::Stopped);
849 }
850 }
851
852 mod builder {
853 use super::*;
854
855 #[test]
856 fn test_postgres_builder_defaults() {
857 let source = PostgresSourceBuilder::new("test").build().unwrap();
858 assert_eq!(source.config.host, "localhost");
859 assert_eq!(source.config.port, 5432);
860 assert_eq!(source.config.slot_name, "drasi_slot");
861 assert_eq!(source.config.publication_name, "drasi_publication");
862 }
863
864 #[test]
865 fn test_postgres_builder_custom_values() {
866 let source = PostgresSourceBuilder::new("test")
867 .with_host("db.example.com")
868 .with_port(5433)
869 .with_database("production")
870 .with_user("app_user")
871 .with_password("secret")
872 .with_tables(vec!["users".to_string(), "orders".to_string()])
873 .build()
874 .unwrap();
875
876 assert_eq!(source.config.host, "db.example.com");
877 assert_eq!(source.config.port, 5433);
878 assert_eq!(source.config.database, "production");
879 assert_eq!(source.config.user, "app_user");
880 assert_eq!(source.config.password, "secret");
881 assert_eq!(source.config.tables.len(), 2);
882 assert_eq!(source.config.tables[0], "users");
883 assert_eq!(source.config.tables[1], "orders");
884 }
885
886 #[test]
887 fn test_builder_add_table() {
888 let source = PostgresSourceBuilder::new("test")
889 .add_table("table1")
890 .add_table("table2")
891 .add_table("table3")
892 .build()
893 .unwrap();
894
895 assert_eq!(source.config.tables.len(), 3);
896 assert_eq!(source.config.tables[0], "table1");
897 assert_eq!(source.config.tables[1], "table2");
898 assert_eq!(source.config.tables[2], "table3");
899 }
900
901 #[test]
902 fn test_builder_slot_and_publication() {
903 let source = PostgresSourceBuilder::new("test")
904 .with_slot_name("custom_slot")
905 .with_publication_name("custom_pub")
906 .build()
907 .unwrap();
908
909 assert_eq!(source.config.slot_name, "custom_slot");
910 assert_eq!(source.config.publication_name, "custom_pub");
911 }
912
913 #[test]
914 fn test_builder_id() {
915 let source = PostgresReplicationSource::builder("my-pg-source")
916 .with_database("db")
917 .with_user("user")
918 .build()
919 .unwrap();
920
921 assert_eq!(source.base.id, "my-pg-source");
922 }
923 }
924
925 mod config {
926 use super::*;
927
928 #[test]
929 fn test_config_serialization() {
930 let config = PostgresSourceConfig {
931 host: "localhost".to_string(),
932 port: 5432,
933 database: "testdb".to_string(),
934 user: "testuser".to_string(),
935 password: String::new(),
936 tables: Vec::new(),
937 slot_name: "drasi_slot".to_string(),
938 publication_name: "drasi_publication".to_string(),
939 ssl_mode: SslMode::default(),
940 table_keys: Vec::new(),
941 };
942
943 let json = serde_json::to_string(&config).unwrap();
944 let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
945
946 assert_eq!(config, deserialized);
947 }
948
949 #[test]
950 fn test_config_deserialization_with_required_fields() {
951 let json = r#"{
952 "database": "mydb",
953 "user": "myuser"
954 }"#;
955 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
956
957 assert_eq!(config.database, "mydb");
958 assert_eq!(config.user, "myuser");
959 assert_eq!(config.host, "localhost"); assert_eq!(config.port, 5432); assert_eq!(config.slot_name, "drasi_slot"); }
963
964 #[test]
965 fn test_config_deserialization_full() {
966 let json = r#"{
967 "host": "db.prod.internal",
968 "port": 5433,
969 "database": "production",
970 "user": "replication_user",
971 "password": "secret",
972 "tables": ["accounts", "transactions"],
973 "slot_name": "prod_slot",
974 "publication_name": "prod_publication"
975 }"#;
976 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
977
978 assert_eq!(config.host, "db.prod.internal");
979 assert_eq!(config.port, 5433);
980 assert_eq!(config.database, "production");
981 assert_eq!(config.user, "replication_user");
982 assert_eq!(config.password, "secret");
983 assert_eq!(config.tables, vec!["accounts", "transactions"]);
984 assert_eq!(config.slot_name, "prod_slot");
985 assert_eq!(config.publication_name, "prod_publication");
986 }
987 }
988}