1pub mod config;
171pub mod connection;
172pub mod decoder;
173pub mod protocol;
174pub mod scram;
175pub mod stream;
176pub mod types;
177
178pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
179
180use anyhow::Result;
181use async_trait::async_trait;
182use log::{error, info};
183use std::collections::HashMap;
184use std::sync::Arc;
185use tokio::sync::RwLock;
186
187use drasi_lib::channels::{DispatchMode, *};
188use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
189use drasi_lib::Source;
190
191pub struct PostgresReplicationSource {
201 base: SourceBase,
203 config: PostgresSourceConfig,
205}
206
207impl PostgresReplicationSource {
208 pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
225 PostgresSourceBuilder::new(id)
226 }
227
228 pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
260 let id = id.into();
261 let params = SourceBaseParams::new(id);
262 Ok(Self {
263 base: SourceBase::new(params)?,
264 config,
265 })
266 }
267
268 pub fn with_dispatch(
273 id: impl Into<String>,
274 config: PostgresSourceConfig,
275 dispatch_mode: Option<DispatchMode>,
276 dispatch_buffer_capacity: Option<usize>,
277 ) -> Result<Self> {
278 let id = id.into();
279 let mut params = SourceBaseParams::new(id);
280 if let Some(mode) = dispatch_mode {
281 params = params.with_dispatch_mode(mode);
282 }
283 if let Some(capacity) = dispatch_buffer_capacity {
284 params = params.with_dispatch_buffer_capacity(capacity);
285 }
286 Ok(Self {
287 base: SourceBase::new(params)?,
288 config,
289 })
290 }
291}
292
293#[async_trait]
294impl Source for PostgresReplicationSource {
295 fn id(&self) -> &str {
296 &self.base.id
297 }
298
299 fn type_name(&self) -> &str {
300 "postgres"
301 }
302
303 fn properties(&self) -> HashMap<String, serde_json::Value> {
304 let mut props = HashMap::new();
305 props.insert(
306 "host".to_string(),
307 serde_json::Value::String(self.config.host.clone()),
308 );
309 props.insert(
310 "port".to_string(),
311 serde_json::Value::Number(self.config.port.into()),
312 );
313 props.insert(
314 "database".to_string(),
315 serde_json::Value::String(self.config.database.clone()),
316 );
317 props.insert(
318 "user".to_string(),
319 serde_json::Value::String(self.config.user.clone()),
320 );
321 props.insert(
323 "tables".to_string(),
324 serde_json::Value::Array(
325 self.config
326 .tables
327 .iter()
328 .map(|t| serde_json::Value::String(t.clone()))
329 .collect(),
330 ),
331 );
332 props
333 }
334
335 fn auto_start(&self) -> bool {
336 self.base.get_auto_start()
337 }
338
339 async fn start(&self) -> Result<()> {
340 if self.base.get_status().await == ComponentStatus::Running {
341 return Ok(());
342 }
343
344 self.base.set_status(ComponentStatus::Starting).await;
345 info!("Starting PostgreSQL replication source: {}", self.base.id);
346
347 let config = self.config.clone();
348 let source_id = self.base.id.clone();
349 let dispatchers = self.base.dispatchers.clone();
350 let status_tx = self.base.status_tx();
351 let status_clone = self.base.status.clone();
352
353 let task = tokio::spawn(async move {
354 if let Err(e) = run_replication(
355 source_id.clone(),
356 config,
357 dispatchers,
358 status_tx.clone(),
359 status_clone.clone(),
360 )
361 .await
362 {
363 error!("Replication task failed for {source_id}: {e}");
364 *status_clone.write().await = ComponentStatus::Error;
365 if let Some(ref tx) = *status_tx.read().await {
366 let _ = tx
367 .send(ComponentEvent {
368 component_id: source_id,
369 component_type: ComponentType::Source,
370 status: ComponentStatus::Error,
371 timestamp: chrono::Utc::now(),
372 message: Some(format!("Replication failed: {e}")),
373 })
374 .await;
375 }
376 }
377 });
378
379 *self.base.task_handle.write().await = Some(task);
380 self.base.set_status(ComponentStatus::Running).await;
381
382 self.base
383 .send_component_event(
384 ComponentStatus::Running,
385 Some("PostgreSQL replication started".to_string()),
386 )
387 .await?;
388
389 Ok(())
390 }
391
392 async fn stop(&self) -> Result<()> {
393 if self.base.get_status().await != ComponentStatus::Running {
394 return Ok(());
395 }
396
397 info!("Stopping PostgreSQL replication source: {}", self.base.id);
398
399 self.base.set_status(ComponentStatus::Stopping).await;
400
401 if let Some(task) = self.base.task_handle.write().await.take() {
403 task.abort();
404 }
405
406 self.base.set_status(ComponentStatus::Stopped).await;
407 self.base
408 .send_component_event(
409 ComponentStatus::Stopped,
410 Some("PostgreSQL replication stopped".to_string()),
411 )
412 .await?;
413
414 Ok(())
415 }
416
417 async fn status(&self) -> ComponentStatus {
418 self.base.get_status().await
419 }
420
421 async fn subscribe(
422 &self,
423 settings: drasi_lib::config::SourceSubscriptionSettings,
424 ) -> Result<SubscriptionResponse> {
425 self.base
426 .subscribe_with_bootstrap(&settings, "PostgreSQL")
427 .await
428 }
429
430 fn as_any(&self) -> &dyn std::any::Any {
431 self
432 }
433
434 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
435 self.base.initialize(context).await;
436 }
437
438 async fn set_bootstrap_provider(
439 &self,
440 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
441 ) {
442 self.base.set_bootstrap_provider(provider).await;
443 }
444}
445
446async fn run_replication(
447 source_id: String,
448 config: PostgresSourceConfig,
449 dispatchers: Arc<
450 RwLock<
451 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
452 >,
453 >,
454 status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
455 status: Arc<RwLock<ComponentStatus>>,
456) -> Result<()> {
457 info!("Starting replication for source {source_id}");
458
459 let mut stream =
460 stream::ReplicationStream::new(config, source_id, dispatchers, status_tx, status);
461
462 stream.run().await
463}
464
465pub struct PostgresSourceBuilder {
485 id: String,
486 host: String,
487 port: u16,
488 database: String,
489 user: String,
490 password: String,
491 tables: Vec<String>,
492 slot_name: String,
493 publication_name: String,
494 ssl_mode: SslMode,
495 table_keys: Vec<TableKeyConfig>,
496 dispatch_mode: Option<DispatchMode>,
497 dispatch_buffer_capacity: Option<usize>,
498 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
499 auto_start: bool,
500}
501
502impl PostgresSourceBuilder {
503 pub fn new(id: impl Into<String>) -> Self {
505 Self {
506 id: id.into(),
507 host: "localhost".to_string(),
508 port: 5432,
509 database: String::new(),
510 user: String::new(),
511 password: String::new(),
512 tables: Vec::new(),
513 slot_name: "drasi_slot".to_string(),
514 publication_name: "drasi_publication".to_string(),
515 ssl_mode: SslMode::default(),
516 table_keys: Vec::new(),
517 dispatch_mode: None,
518 dispatch_buffer_capacity: None,
519 bootstrap_provider: None,
520 auto_start: true,
521 }
522 }
523
524 pub fn with_host(mut self, host: impl Into<String>) -> Self {
526 self.host = host.into();
527 self
528 }
529
530 pub fn with_port(mut self, port: u16) -> Self {
532 self.port = port;
533 self
534 }
535
536 pub fn with_database(mut self, database: impl Into<String>) -> Self {
538 self.database = database.into();
539 self
540 }
541
542 pub fn with_user(mut self, user: impl Into<String>) -> Self {
544 self.user = user.into();
545 self
546 }
547
548 pub fn with_password(mut self, password: impl Into<String>) -> Self {
550 self.password = password.into();
551 self
552 }
553
554 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
556 self.tables = tables;
557 self
558 }
559
560 pub fn add_table(mut self, table: impl Into<String>) -> Self {
562 self.tables.push(table.into());
563 self
564 }
565
566 pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
568 self.slot_name = slot_name.into();
569 self
570 }
571
572 pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
574 self.publication_name = publication_name.into();
575 self
576 }
577
578 pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
580 self.ssl_mode = ssl_mode;
581 self
582 }
583
584 pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
586 self.table_keys = table_keys;
587 self
588 }
589
590 pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
592 self.table_keys.push(table_key);
593 self
594 }
595
596 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
598 self.dispatch_mode = Some(mode);
599 self
600 }
601
602 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
604 self.dispatch_buffer_capacity = Some(capacity);
605 self
606 }
607
608 pub fn with_bootstrap_provider(
610 mut self,
611 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
612 ) -> Self {
613 self.bootstrap_provider = Some(Box::new(provider));
614 self
615 }
616
617 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
622 self.auto_start = auto_start;
623 self
624 }
625
626 pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
628 self.host = config.host;
629 self.port = config.port;
630 self.database = config.database;
631 self.user = config.user;
632 self.password = config.password;
633 self.tables = config.tables;
634 self.slot_name = config.slot_name;
635 self.publication_name = config.publication_name;
636 self.ssl_mode = config.ssl_mode;
637 self.table_keys = config.table_keys;
638 self
639 }
640
641 pub fn build(self) -> Result<PostgresReplicationSource> {
647 let config = PostgresSourceConfig {
648 host: self.host,
649 port: self.port,
650 database: self.database,
651 user: self.user,
652 password: self.password,
653 tables: self.tables,
654 slot_name: self.slot_name,
655 publication_name: self.publication_name,
656 ssl_mode: self.ssl_mode,
657 table_keys: self.table_keys,
658 };
659
660 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
661 if let Some(mode) = self.dispatch_mode {
662 params = params.with_dispatch_mode(mode);
663 }
664 if let Some(capacity) = self.dispatch_buffer_capacity {
665 params = params.with_dispatch_buffer_capacity(capacity);
666 }
667 if let Some(provider) = self.bootstrap_provider {
668 params = params.with_bootstrap_provider(provider);
669 }
670
671 Ok(PostgresReplicationSource {
672 base: SourceBase::new(params)?,
673 config,
674 })
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681
682 mod construction {
683 use super::*;
684
685 #[test]
686 fn test_builder_with_valid_config() {
687 let source = PostgresSourceBuilder::new("test-source")
688 .with_database("testdb")
689 .with_user("testuser")
690 .build();
691 assert!(source.is_ok());
692 }
693
694 #[test]
695 fn test_builder_with_custom_config() {
696 let source = PostgresSourceBuilder::new("pg-source")
697 .with_host("192.168.1.100")
698 .with_port(5433)
699 .with_database("production")
700 .with_user("admin")
701 .with_password("secret")
702 .build()
703 .unwrap();
704 assert_eq!(source.id(), "pg-source");
705 }
706
707 #[test]
708 fn test_with_dispatch_creates_source() {
709 let config = PostgresSourceConfig {
710 host: "localhost".to_string(),
711 port: 5432,
712 database: "testdb".to_string(),
713 user: "testuser".to_string(),
714 password: String::new(),
715 tables: Vec::new(),
716 slot_name: "drasi_slot".to_string(),
717 publication_name: "drasi_publication".to_string(),
718 ssl_mode: SslMode::default(),
719 table_keys: Vec::new(),
720 };
721 let source = PostgresReplicationSource::with_dispatch(
722 "dispatch-source",
723 config,
724 Some(DispatchMode::Channel),
725 Some(2000),
726 );
727 assert!(source.is_ok());
728 assert_eq!(source.unwrap().id(), "dispatch-source");
729 }
730 }
731
732 mod properties {
733 use super::*;
734
735 #[test]
736 fn test_id_returns_correct_value() {
737 let source = PostgresSourceBuilder::new("my-pg-source")
738 .with_database("db")
739 .with_user("user")
740 .build()
741 .unwrap();
742 assert_eq!(source.id(), "my-pg-source");
743 }
744
745 #[test]
746 fn test_type_name_returns_postgres() {
747 let source = PostgresSourceBuilder::new("test")
748 .with_database("db")
749 .with_user("user")
750 .build()
751 .unwrap();
752 assert_eq!(source.type_name(), "postgres");
753 }
754
755 #[test]
756 fn test_properties_contains_connection_info() {
757 let source = PostgresSourceBuilder::new("test")
758 .with_host("db.example.com")
759 .with_port(5433)
760 .with_database("mydb")
761 .with_user("app_user")
762 .with_password("secret")
763 .with_tables(vec!["users".to_string()])
764 .build()
765 .unwrap();
766 let props = source.properties();
767
768 assert_eq!(
769 props.get("host"),
770 Some(&serde_json::Value::String("db.example.com".to_string()))
771 );
772 assert_eq!(
773 props.get("port"),
774 Some(&serde_json::Value::Number(5433.into()))
775 );
776 assert_eq!(
777 props.get("database"),
778 Some(&serde_json::Value::String("mydb".to_string()))
779 );
780 assert_eq!(
781 props.get("user"),
782 Some(&serde_json::Value::String("app_user".to_string()))
783 );
784 }
785
786 #[test]
787 fn test_properties_does_not_expose_password() {
788 let source = PostgresSourceBuilder::new("test")
789 .with_database("db")
790 .with_user("user")
791 .with_password("super_secret_password")
792 .build()
793 .unwrap();
794 let props = source.properties();
795
796 assert!(!props.contains_key("password"));
798 }
799
800 #[test]
801 fn test_properties_includes_tables() {
802 let source = PostgresSourceBuilder::new("test")
803 .with_database("db")
804 .with_user("user")
805 .with_tables(vec!["users".to_string(), "orders".to_string()])
806 .build()
807 .unwrap();
808 let props = source.properties();
809
810 let tables = props.get("tables").unwrap().as_array().unwrap();
811 assert_eq!(tables.len(), 2);
812 assert_eq!(tables[0], "users");
813 assert_eq!(tables[1], "orders");
814 }
815 }
816
817 mod lifecycle {
818 use super::*;
819
820 #[tokio::test]
821 async fn test_initial_status_is_stopped() {
822 let source = PostgresSourceBuilder::new("test")
823 .with_database("db")
824 .with_user("user")
825 .build()
826 .unwrap();
827 assert_eq!(source.status().await, ComponentStatus::Stopped);
828 }
829 }
830
831 mod builder {
832 use super::*;
833
834 #[test]
835 fn test_postgres_builder_defaults() {
836 let source = PostgresSourceBuilder::new("test").build().unwrap();
837 assert_eq!(source.config.host, "localhost");
838 assert_eq!(source.config.port, 5432);
839 assert_eq!(source.config.slot_name, "drasi_slot");
840 assert_eq!(source.config.publication_name, "drasi_publication");
841 }
842
843 #[test]
844 fn test_postgres_builder_custom_values() {
845 let source = PostgresSourceBuilder::new("test")
846 .with_host("db.example.com")
847 .with_port(5433)
848 .with_database("production")
849 .with_user("app_user")
850 .with_password("secret")
851 .with_tables(vec!["users".to_string(), "orders".to_string()])
852 .build()
853 .unwrap();
854
855 assert_eq!(source.config.host, "db.example.com");
856 assert_eq!(source.config.port, 5433);
857 assert_eq!(source.config.database, "production");
858 assert_eq!(source.config.user, "app_user");
859 assert_eq!(source.config.password, "secret");
860 assert_eq!(source.config.tables.len(), 2);
861 assert_eq!(source.config.tables[0], "users");
862 assert_eq!(source.config.tables[1], "orders");
863 }
864
865 #[test]
866 fn test_builder_add_table() {
867 let source = PostgresSourceBuilder::new("test")
868 .add_table("table1")
869 .add_table("table2")
870 .add_table("table3")
871 .build()
872 .unwrap();
873
874 assert_eq!(source.config.tables.len(), 3);
875 assert_eq!(source.config.tables[0], "table1");
876 assert_eq!(source.config.tables[1], "table2");
877 assert_eq!(source.config.tables[2], "table3");
878 }
879
880 #[test]
881 fn test_builder_slot_and_publication() {
882 let source = PostgresSourceBuilder::new("test")
883 .with_slot_name("custom_slot")
884 .with_publication_name("custom_pub")
885 .build()
886 .unwrap();
887
888 assert_eq!(source.config.slot_name, "custom_slot");
889 assert_eq!(source.config.publication_name, "custom_pub");
890 }
891
892 #[test]
893 fn test_builder_id() {
894 let source = PostgresReplicationSource::builder("my-pg-source")
895 .with_database("db")
896 .with_user("user")
897 .build()
898 .unwrap();
899
900 assert_eq!(source.base.id, "my-pg-source");
901 }
902 }
903
904 mod config {
905 use super::*;
906
907 #[test]
908 fn test_config_serialization() {
909 let config = PostgresSourceConfig {
910 host: "localhost".to_string(),
911 port: 5432,
912 database: "testdb".to_string(),
913 user: "testuser".to_string(),
914 password: String::new(),
915 tables: Vec::new(),
916 slot_name: "drasi_slot".to_string(),
917 publication_name: "drasi_publication".to_string(),
918 ssl_mode: SslMode::default(),
919 table_keys: Vec::new(),
920 };
921
922 let json = serde_json::to_string(&config).unwrap();
923 let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
924
925 assert_eq!(config, deserialized);
926 }
927
928 #[test]
929 fn test_config_deserialization_with_required_fields() {
930 let json = r#"{
931 "database": "mydb",
932 "user": "myuser"
933 }"#;
934 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
935
936 assert_eq!(config.database, "mydb");
937 assert_eq!(config.user, "myuser");
938 assert_eq!(config.host, "localhost"); assert_eq!(config.port, 5432); assert_eq!(config.slot_name, "drasi_slot"); }
942
943 #[test]
944 fn test_config_deserialization_full() {
945 let json = r#"{
946 "host": "db.prod.internal",
947 "port": 5433,
948 "database": "production",
949 "user": "replication_user",
950 "password": "secret",
951 "tables": ["accounts", "transactions"],
952 "slot_name": "prod_slot",
953 "publication_name": "prod_publication"
954 }"#;
955 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
956
957 assert_eq!(config.host, "db.prod.internal");
958 assert_eq!(config.port, 5433);
959 assert_eq!(config.database, "production");
960 assert_eq!(config.user, "replication_user");
961 assert_eq!(config.password, "secret");
962 assert_eq!(config.tables, vec!["accounts", "transactions"]);
963 assert_eq!(config.slot_name, "prod_slot");
964 assert_eq!(config.publication_name, "prod_publication");
965 }
966 }
967}