1use serde::{Deserialize, Serialize};
69use std::collections::HashMap;
70use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
71use std::sync::Arc;
72use tokio::sync::RwLock;
73use tracing::{debug, info, warn};
74use uuid::Uuid;
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78#[serde(rename_all = "kebab-case")]
79pub enum SignalAction {
80 ExecuteSnapshot,
82 StopSnapshot,
84 PauseSnapshot,
86 ResumeSnapshot,
88 Log,
90 Custom(String),
92}
93
94impl SignalAction {
95 pub fn as_str(&self) -> &str {
97 match self {
98 SignalAction::ExecuteSnapshot => "execute-snapshot",
99 SignalAction::StopSnapshot => "stop-snapshot",
100 SignalAction::PauseSnapshot => "pause-snapshot",
101 SignalAction::ResumeSnapshot => "resume-snapshot",
102 SignalAction::Log => "log",
103 SignalAction::Custom(name) => name,
104 }
105 }
106
107 pub fn parse(s: &str) -> Self {
109 match s {
110 "execute-snapshot" => SignalAction::ExecuteSnapshot,
111 "stop-snapshot" => SignalAction::StopSnapshot,
112 "pause-snapshot" => SignalAction::PauseSnapshot,
113 "resume-snapshot" => SignalAction::ResumeSnapshot,
114 "log" => SignalAction::Log,
115 other => SignalAction::Custom(other.to_string()),
116 }
117 }
118}
119
120#[derive(Debug, Clone, Default, Serialize, Deserialize)]
122pub struct SignalData {
123 #[serde(default, rename = "data-collections")]
125 pub data_collections: Vec<String>,
126 #[serde(default, rename = "type")]
128 pub snapshot_type: Option<String>,
129 #[serde(default, flatten)]
131 pub properties: HashMap<String, serde_json::Value>,
132}
133
134impl SignalData {
135 pub fn empty() -> Self {
137 Self {
138 data_collections: Vec::new(),
139 snapshot_type: None,
140 properties: HashMap::new(),
141 }
142 }
143
144 pub fn for_snapshot(tables: Vec<String>, snapshot_type: &str) -> Self {
146 Self {
147 data_collections: tables,
148 snapshot_type: Some(snapshot_type.to_string()),
149 properties: HashMap::new(),
150 }
151 }
152
153 pub fn for_log(message: &str) -> Self {
155 let mut properties = HashMap::new();
156 properties.insert(
157 "message".to_string(),
158 serde_json::Value::String(message.to_string()),
159 );
160 Self {
161 data_collections: Vec::new(),
162 snapshot_type: None,
163 properties,
164 }
165 }
166
167 pub fn with_property(mut self, key: &str, value: serde_json::Value) -> Self {
169 self.properties.insert(key.to_string(), value);
170 self
171 }
172
173 pub fn get_property(&self, key: &str) -> Option<&serde_json::Value> {
175 self.properties.get(key)
176 }
177
178 pub fn log_message(&self) -> Option<&str> {
180 self.properties.get("message")?.as_str()
181 }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct Signal {
187 pub id: String,
189 #[serde(rename = "type")]
191 pub action: SignalAction,
192 #[serde(default)]
194 pub data: SignalData,
195 #[serde(default = "default_timestamp")]
197 pub timestamp: i64,
198 #[serde(default)]
200 pub source: SignalSource,
201}
202
203fn default_timestamp() -> i64 {
204 chrono::Utc::now().timestamp_millis()
205}
206
207impl Signal {
208 pub fn new(id: impl Into<String>, action: SignalAction, data: SignalData) -> Self {
210 Self {
211 id: id.into(),
212 action,
213 data,
214 timestamp: chrono::Utc::now().timestamp_millis(),
215 source: SignalSource::Api,
216 }
217 }
218
219 pub fn execute_snapshot(tables: Vec<String>) -> Self {
221 Self::new(
222 Uuid::new_v4().to_string(),
223 SignalAction::ExecuteSnapshot,
224 SignalData::for_snapshot(tables, "incremental"),
225 )
226 }
227
228 pub fn blocking_snapshot(tables: Vec<String>) -> Self {
230 Self::new(
231 Uuid::new_v4().to_string(),
232 SignalAction::ExecuteSnapshot,
233 SignalData::for_snapshot(tables, "blocking"),
234 )
235 }
236
237 pub fn stop_snapshot() -> Self {
239 Self::new(
240 Uuid::new_v4().to_string(),
241 SignalAction::StopSnapshot,
242 SignalData::empty(),
243 )
244 }
245
246 pub fn pause() -> Self {
248 Self::new(
249 Uuid::new_v4().to_string(),
250 SignalAction::PauseSnapshot,
251 SignalData::empty(),
252 )
253 }
254
255 pub fn resume() -> Self {
257 Self::new(
258 Uuid::new_v4().to_string(),
259 SignalAction::ResumeSnapshot,
260 SignalData::empty(),
261 )
262 }
263
264 pub fn log(message: &str) -> Self {
266 Self::new(
267 Uuid::new_v4().to_string(),
268 SignalAction::Log,
269 SignalData::for_log(message),
270 )
271 }
272
273 pub fn custom(action: &str, data: SignalData) -> Self {
275 Self::new(
276 Uuid::new_v4().to_string(),
277 SignalAction::Custom(action.to_string()),
278 data,
279 )
280 }
281
282 pub fn with_source(mut self, source: SignalSource) -> Self {
284 self.source = source;
285 self
286 }
287
288 pub fn is_snapshot_action(&self) -> bool {
290 matches!(
291 self.action,
292 SignalAction::ExecuteSnapshot | SignalAction::StopSnapshot
293 )
294 }
295
296 pub fn is_control_action(&self) -> bool {
298 matches!(
299 self.action,
300 SignalAction::PauseSnapshot | SignalAction::ResumeSnapshot
301 )
302 }
303}
304
305#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
307#[serde(rename_all = "lowercase")]
308pub enum SignalSource {
309 #[default]
311 Api,
312 Source,
314 Topic,
316 File,
318}
319
320#[derive(Debug, Clone)]
322pub enum SignalResult {
323 Success,
325 Pending(String),
327 Ignored(String),
329 Failed(String),
331}
332
333impl SignalResult {
334 pub fn is_success(&self) -> bool {
336 matches!(self, SignalResult::Success | SignalResult::Pending(_))
337 }
338
339 pub fn error_message(&self) -> Option<&str> {
341 match self {
342 SignalResult::Failed(msg) => Some(msg),
343 _ => None,
344 }
345 }
346}
347
348pub trait SignalHandler: Send + Sync {
350 fn handle(&self, signal: &Signal) -> impl std::future::Future<Output = SignalResult> + Send;
352
353 fn supported_actions(&self) -> Vec<SignalAction>;
355}
356
357#[derive(Debug, Default)]
359pub struct SignalStats {
360 signals_received: AtomicU64,
362 signals_processed: AtomicU64,
364 signals_failed: AtomicU64,
366 signals_ignored: AtomicU64,
368 snapshot_signals: AtomicU64,
370 control_signals: AtomicU64,
372}
373
374impl SignalStats {
375 pub fn record_received(&self) {
377 self.signals_received.fetch_add(1, Ordering::Relaxed);
378 }
379
380 pub fn record_processed(&self) {
382 self.signals_processed.fetch_add(1, Ordering::Relaxed);
383 }
384
385 pub fn record_failed(&self) {
387 self.signals_failed.fetch_add(1, Ordering::Relaxed);
388 }
389
390 pub fn record_ignored(&self) {
392 self.signals_ignored.fetch_add(1, Ordering::Relaxed);
393 }
394
395 pub fn record_snapshot(&self) {
397 self.snapshot_signals.fetch_add(1, Ordering::Relaxed);
398 }
399
400 pub fn record_control(&self) {
402 self.control_signals.fetch_add(1, Ordering::Relaxed);
403 }
404
405 pub fn received(&self) -> u64 {
407 self.signals_received.load(Ordering::Relaxed)
408 }
409
410 pub fn processed(&self) -> u64 {
412 self.signals_processed.load(Ordering::Relaxed)
413 }
414
415 pub fn failed(&self) -> u64 {
417 self.signals_failed.load(Ordering::Relaxed)
418 }
419}
420
421type BoxedHandler = Box<
423 dyn Fn(&Signal) -> std::pin::Pin<Box<dyn std::future::Future<Output = SignalResult> + Send>>
424 + Send
425 + Sync,
426>;
427
428pub struct SignalProcessor {
430 handlers: RwLock<HashMap<String, BoxedHandler>>,
432 stats: Arc<SignalStats>,
434 paused: AtomicBool,
436 enabled_sources: RwLock<Vec<SignalSource>>,
438}
439
440impl Default for SignalProcessor {
441 fn default() -> Self {
442 Self::new()
443 }
444}
445
446impl SignalProcessor {
447 pub fn new() -> Self {
449 Self {
450 handlers: RwLock::new(HashMap::new()),
451 stats: Arc::new(SignalStats::default()),
452 paused: AtomicBool::new(false),
453 enabled_sources: RwLock::new(vec![SignalSource::Api, SignalSource::Source]),
454 }
455 }
456
457 pub fn stats(&self) -> &Arc<SignalStats> {
459 &self.stats
460 }
461
462 pub fn is_paused(&self) -> bool {
464 self.paused.load(Ordering::Relaxed)
465 }
466
467 pub fn pause(&self) {
469 self.paused.store(true, Ordering::Relaxed);
470 info!("Signal processor paused");
471 }
472
473 pub fn resume(&self) {
475 self.paused.store(false, Ordering::Relaxed);
476 info!("Signal processor resumed");
477 }
478
479 pub async fn register_handler<F, Fut>(&self, action: &str, handler: F)
481 where
482 F: Fn(&Signal) -> Fut + Send + Sync + 'static,
483 Fut: std::future::Future<Output = SignalResult> + Send + 'static,
484 {
485 let boxed: BoxedHandler = Box::new(move |signal| Box::pin(handler(signal)));
486 self.handlers
487 .write()
488 .await
489 .insert(action.to_string(), boxed);
490 debug!("Registered handler for action: {}", action);
491 }
492
493 pub async fn set_enabled_sources(&self, sources: Vec<SignalSource>) {
495 *self.enabled_sources.write().await = sources;
496 }
497
498 pub async fn is_source_enabled(&self, source: &SignalSource) -> bool {
500 self.enabled_sources.read().await.contains(source)
501 }
502
503 pub async fn process(&self, signal: Signal) -> SignalResult {
505 self.stats.record_received();
506
507 if !self.is_source_enabled(&signal.source).await {
509 debug!(
510 "Signal source {:?} not enabled, ignoring: {}",
511 signal.source, signal.id
512 );
513 self.stats.record_ignored();
514 return SignalResult::Ignored(format!("Source {:?} not enabled", signal.source));
515 }
516
517 info!(
518 "Processing signal: id={}, action={:?}, source={:?}",
519 signal.id, signal.action, signal.source
520 );
521
522 if signal.is_snapshot_action() {
524 self.stats.record_snapshot();
525 }
526 if signal.is_control_action() {
527 self.stats.record_control();
528 }
529
530 let result = match &signal.action {
532 SignalAction::PauseSnapshot => {
533 self.pause();
534 SignalResult::Success
535 }
536 SignalAction::ResumeSnapshot => {
537 self.resume();
538 SignalResult::Success
539 }
540 SignalAction::Log => {
541 if let Some(msg) = signal.data.log_message() {
542 info!("Signal log message: {}", msg);
543 }
544 SignalResult::Success
545 }
546 _ => {
547 let handlers = self.handlers.read().await;
549 if let Some(handler) = handlers.get(signal.action.as_str()) {
550 handler(&signal).await
551 } else {
552 if signal.is_snapshot_action() {
554 SignalResult::Pending(format!(
555 "Snapshot signal {} queued for processing",
556 signal.id
557 ))
558 } else {
559 warn!("No handler for action: {:?}", signal.action);
560 SignalResult::Ignored(format!("No handler for action: {:?}", signal.action))
561 }
562 }
563 }
564 };
565
566 match &result {
568 SignalResult::Success | SignalResult::Pending(_) => {
569 self.stats.record_processed();
570 }
571 SignalResult::Failed(_) => {
572 self.stats.record_failed();
573 }
574 SignalResult::Ignored(_) => {
575 self.stats.record_ignored();
576 }
577 }
578
579 result
580 }
581
582 pub fn parse_from_row(
584 id: &str,
585 signal_type: &str,
586 data: Option<&str>,
587 ) -> Result<Signal, String> {
588 let action = SignalAction::parse(signal_type);
589
590 let signal_data = if let Some(data_str) = data {
591 serde_json::from_str(data_str)
592 .map_err(|e| format!("Failed to parse signal data: {}", e))?
593 } else {
594 SignalData::empty()
595 };
596
597 Ok(Signal::new(id, action, signal_data).with_source(SignalSource::Source))
598 }
599}
600
601#[derive(Clone)]
603pub struct SignalChannel {
604 sender: tokio::sync::mpsc::Sender<Signal>,
605}
606
607impl SignalChannel {
608 pub fn new(buffer_size: usize) -> (Self, tokio::sync::mpsc::Receiver<Signal>) {
610 let (sender, receiver) = tokio::sync::mpsc::channel(buffer_size);
611 (Self { sender }, receiver)
612 }
613
614 pub async fn send(&self, signal: Signal) -> Result<(), String> {
616 self.sender
617 .send(signal)
618 .await
619 .map_err(|e| format!("Failed to send signal: {}", e))
620 }
621
622 pub fn try_send(&self, signal: Signal) -> Result<(), String> {
624 self.sender
625 .try_send(signal)
626 .map_err(|e| format!("Failed to send signal: {}", e))
627 }
628}
629
630#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
636#[serde(rename_all = "lowercase")]
637#[derive(Default)]
638pub enum SignalChannelType {
639 #[default]
641 Source,
642 Topic,
644 File,
646 Api,
648}
649
650impl SignalChannelType {
651 pub fn as_str(&self) -> &'static str {
653 match self {
654 SignalChannelType::Source => "source",
655 SignalChannelType::Topic => "topic",
656 SignalChannelType::File => "file",
657 SignalChannelType::Api => "api",
658 }
659 }
660
661 pub fn parse(s: &str) -> Option<Self> {
663 match s.to_lowercase().as_str() {
664 "source" => Some(SignalChannelType::Source),
665 "topic" => Some(SignalChannelType::Topic),
666 "file" => Some(SignalChannelType::File),
667 "api" => Some(SignalChannelType::Api),
668 _ => None,
669 }
670 }
671}
672
673#[derive(Debug, Clone, Serialize, Deserialize)]
675pub struct SignalConfig {
676 #[serde(default = "default_enabled_channels")]
678 pub enabled_channels: Vec<SignalChannelType>,
679
680 #[serde(default)]
684 pub signal_data_collection: Option<String>,
685
686 #[serde(default)]
689 pub signal_topic: Option<String>,
690
691 #[serde(default)]
694 pub signal_file: Option<String>,
695
696 #[serde(default = "default_poll_interval_ms")]
698 pub signal_poll_interval_ms: u64,
699
700 #[serde(default)]
702 pub signal_consumer_properties: HashMap<String, String>,
703}
704
705fn default_enabled_channels() -> Vec<SignalChannelType> {
706 vec![SignalChannelType::Source, SignalChannelType::Topic]
707}
708
709fn default_poll_interval_ms() -> u64 {
710 1000 }
712
713impl Default for SignalConfig {
714 fn default() -> Self {
715 Self {
716 enabled_channels: default_enabled_channels(),
717 signal_data_collection: None,
718 signal_topic: None,
719 signal_file: None,
720 signal_poll_interval_ms: default_poll_interval_ms(),
721 signal_consumer_properties: HashMap::new(),
722 }
723 }
724}
725
726impl SignalConfig {
727 pub fn builder() -> SignalConfigBuilder {
729 SignalConfigBuilder::default()
730 }
731
732 pub fn is_channel_enabled(&self, channel: SignalChannelType) -> bool {
734 self.enabled_channels.contains(&channel)
735 }
736
737 pub fn signal_table_name(&self) -> Option<&str> {
739 self.signal_data_collection
740 .as_ref()
741 .and_then(|s| s.split('.').next_back())
742 }
743
744 pub fn signal_schema_name(&self) -> Option<&str> {
746 self.signal_data_collection.as_ref().and_then(|s| {
747 let parts: Vec<&str> = s.split('.').collect();
748 if parts.len() >= 2 {
749 Some(parts[0])
750 } else {
751 None
752 }
753 })
754 }
755
756 pub fn parse_enabled_channels(s: &str) -> Vec<SignalChannelType> {
758 s.split(',')
759 .filter_map(|c| SignalChannelType::parse(c.trim()))
760 .collect()
761 }
762}
763
764#[derive(Debug, Default)]
766pub struct SignalConfigBuilder {
767 enabled_channels: Option<Vec<SignalChannelType>>,
768 signal_data_collection: Option<String>,
769 signal_topic: Option<String>,
770 signal_file: Option<String>,
771 signal_poll_interval_ms: Option<u64>,
772 signal_consumer_properties: HashMap<String, String>,
773}
774
775impl SignalConfigBuilder {
776 pub fn enabled_channels(mut self, channels: Vec<SignalChannelType>) -> Self {
778 self.enabled_channels = Some(channels);
779 self
780 }
781
782 pub fn enable_channel(mut self, channel: SignalChannelType) -> Self {
784 self.enabled_channels
785 .get_or_insert_with(Vec::new)
786 .push(channel);
787 self
788 }
789
790 pub fn signal_data_collection(mut self, collection: impl Into<String>) -> Self {
792 self.signal_data_collection = Some(collection.into());
793 self
794 }
795
796 pub fn signal_topic(mut self, topic: impl Into<String>) -> Self {
798 self.signal_topic = Some(topic.into());
799 self
800 }
801
802 pub fn signal_file(mut self, path: impl Into<String>) -> Self {
804 self.signal_file = Some(path.into());
805 self
806 }
807
808 pub fn signal_poll_interval_ms(mut self, ms: u64) -> Self {
810 self.signal_poll_interval_ms = Some(ms);
811 self
812 }
813
814 pub fn consumer_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
816 self.signal_consumer_properties
817 .insert(key.into(), value.into());
818 self
819 }
820
821 pub fn build(self) -> SignalConfig {
823 SignalConfig {
824 enabled_channels: self
825 .enabled_channels
826 .unwrap_or_else(default_enabled_channels),
827 signal_data_collection: self.signal_data_collection,
828 signal_topic: self.signal_topic,
829 signal_file: self.signal_file,
830 signal_poll_interval_ms: self
831 .signal_poll_interval_ms
832 .unwrap_or_else(default_poll_interval_ms),
833 signal_consumer_properties: self.signal_consumer_properties,
834 }
835 }
836}
837
838#[derive(Debug, Clone)]
844pub struct SignalRecord {
845 pub id: String,
847 pub signal_type: String,
849 pub data: Option<String>,
851 pub offset: Option<String>,
853}
854
855impl SignalRecord {
856 pub fn new(id: impl Into<String>, signal_type: impl Into<String>) -> Self {
858 Self {
859 id: id.into(),
860 signal_type: signal_type.into(),
861 data: None,
862 offset: None,
863 }
864 }
865
866 pub fn with_data(mut self, data: impl Into<String>) -> Self {
868 self.data = Some(data.into());
869 self
870 }
871
872 pub fn with_offset(mut self, offset: impl Into<String>) -> Self {
874 self.offset = Some(offset.into());
875 self
876 }
877
878 pub fn to_signal(&self, source: SignalSource) -> Result<Signal, String> {
880 let action = SignalAction::parse(&self.signal_type);
881 let signal_data = if let Some(data_str) = &self.data {
882 serde_json::from_str(data_str)
883 .map_err(|e| format!("Failed to parse signal data: {}", e))?
884 } else {
885 SignalData::empty()
886 };
887 Ok(Signal::new(&self.id, action, signal_data).with_source(source))
888 }
889}
890
891#[async_trait::async_trait]
895pub trait SignalChannelReader: Send + Sync {
896 fn name(&self) -> &str;
898
899 async fn init(&mut self) -> Result<(), String>;
901
902 async fn read(&mut self) -> Result<Vec<SignalRecord>, String>;
904
905 async fn acknowledge(&mut self, _signal_id: &str) -> Result<(), String> {
907 Ok(()) }
909
910 async fn close(&mut self) -> Result<(), String>;
912}
913
914pub struct SourceSignalChannel {
934 signal_table: String,
936 pending: Arc<RwLock<Vec<SignalRecord>>>,
938 initialized: bool,
940}
941
942impl SourceSignalChannel {
943 pub fn new(signal_table: impl Into<String>) -> Self {
945 Self {
946 signal_table: signal_table.into(),
947 pending: Arc::new(RwLock::new(Vec::new())),
948 initialized: false,
949 }
950 }
951
952 pub fn pending_signals(&self) -> Arc<RwLock<Vec<SignalRecord>>> {
954 Arc::clone(&self.pending)
955 }
956
957 pub fn is_signal_event(&self, schema: &str, table: &str) -> bool {
959 let expected = format!("{}.{}", schema, table);
960 self.signal_table == expected || self.signal_table == table
961 }
962
963 pub async fn handle_cdc_event(
965 &self,
966 id: &str,
967 signal_type: &str,
968 data: Option<&str>,
969 ) -> Result<(), String> {
970 let record = SignalRecord {
971 id: id.to_string(),
972 signal_type: signal_type.to_string(),
973 data: data.map(|s| s.to_string()),
974 offset: None,
975 };
976 self.pending.write().await.push(record);
977 debug!(
978 "Source channel: detected signal {} of type {}",
979 id, signal_type
980 );
981 Ok(())
982 }
983}
984
985#[async_trait::async_trait]
986impl SignalChannelReader for SourceSignalChannel {
987 fn name(&self) -> &str {
988 "source"
989 }
990
991 async fn init(&mut self) -> Result<(), String> {
992 info!(
993 "Source signal channel initialized for table: {}",
994 self.signal_table
995 );
996 self.initialized = true;
997 Ok(())
998 }
999
1000 async fn read(&mut self) -> Result<Vec<SignalRecord>, String> {
1001 let mut pending = self.pending.write().await;
1003 let signals = std::mem::take(&mut *pending);
1004 if !signals.is_empty() {
1005 debug!("Source channel: returning {} signals", signals.len());
1006 }
1007 Ok(signals)
1008 }
1009
1010 async fn close(&mut self) -> Result<(), String> {
1011 info!("Source signal channel closed");
1012 self.initialized = false;
1013 Ok(())
1014 }
1015}
1016
1017pub struct FileSignalChannel {
1030 path: std::path::PathBuf,
1032 processed: std::collections::HashSet<String>,
1034 last_modified: Option<std::time::SystemTime>,
1036 initialized: bool,
1038}
1039
1040impl FileSignalChannel {
1041 pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
1043 Self {
1044 path: path.into(),
1045 processed: std::collections::HashSet::new(),
1046 last_modified: None,
1047 initialized: false,
1048 }
1049 }
1050}
1051
1052#[async_trait::async_trait]
1053impl SignalChannelReader for FileSignalChannel {
1054 fn name(&self) -> &str {
1055 "file"
1056 }
1057
1058 async fn init(&mut self) -> Result<(), String> {
1059 if !self.path.exists() {
1060 tokio::fs::write(&self.path, "")
1062 .await
1063 .map_err(|e| format!("Failed to create signal file: {}", e))?;
1064 }
1065 info!("File signal channel initialized: {:?}", self.path);
1066 self.initialized = true;
1067 Ok(())
1068 }
1069
1070 async fn read(&mut self) -> Result<Vec<SignalRecord>, String> {
1071 let metadata = tokio::fs::metadata(&self.path)
1073 .await
1074 .map_err(|e| format!("Failed to read signal file metadata: {}", e))?;
1075
1076 let modified = metadata
1077 .modified()
1078 .map_err(|e| format!("Failed to get file modification time: {}", e))?;
1079
1080 if self.last_modified == Some(modified) {
1081 return Ok(Vec::new()); }
1083 self.last_modified = Some(modified);
1084
1085 let content = tokio::fs::read_to_string(&self.path)
1087 .await
1088 .map_err(|e| format!("Failed to read signal file: {}", e))?;
1089
1090 let mut signals = Vec::new();
1091 for line in content.lines() {
1092 let line = line.trim();
1093 if line.is_empty() || line.starts_with('#') {
1094 continue;
1095 }
1096
1097 #[derive(Deserialize)]
1099 struct FileSignal {
1100 id: String,
1101 #[serde(rename = "type")]
1102 signal_type: String,
1103 data: Option<serde_json::Value>,
1104 }
1105
1106 match serde_json::from_str::<FileSignal>(line) {
1107 Ok(fs) => {
1108 if !self.processed.contains(&fs.id) {
1109 let record = SignalRecord {
1110 id: fs.id.clone(),
1111 signal_type: fs.signal_type,
1112 data: fs.data.map(|v| v.to_string()),
1113 offset: None,
1114 };
1115 signals.push(record);
1116 self.processed.insert(fs.id);
1117 }
1118 }
1119 Err(e) => {
1120 warn!("Failed to parse signal line: {} - {}", line, e);
1121 }
1122 }
1123 }
1124
1125 if !signals.is_empty() {
1126 debug!("File channel: read {} new signals", signals.len());
1127 }
1128
1129 Ok(signals)
1130 }
1131
1132 async fn close(&mut self) -> Result<(), String> {
1133 info!("File signal channel closed");
1134 self.initialized = false;
1135 Ok(())
1136 }
1137}
1138
1139pub struct SignalManager {
1145 channels: Vec<Box<dyn SignalChannelReader>>,
1147 processor: Arc<SignalProcessor>,
1149 config: SignalConfig,
1151 running: Arc<AtomicBool>,
1153}
1154
1155impl SignalManager {
1156 pub fn new(config: SignalConfig, processor: Arc<SignalProcessor>) -> Self {
1158 Self {
1159 channels: Vec::new(),
1160 processor,
1161 config,
1162 running: Arc::new(AtomicBool::new(false)),
1163 }
1164 }
1165
1166 pub fn add_channel(&mut self, channel: Box<dyn SignalChannelReader>) {
1168 info!("Adding signal channel: {}", channel.name());
1169 self.channels.push(channel);
1170 }
1171
1172 pub async fn init(&mut self) -> Result<(), String> {
1174 for channel in &mut self.channels {
1175 channel.init().await?;
1176 }
1177 self.running.store(true, Ordering::SeqCst);
1178 info!(
1179 "Signal manager initialized with {} channels",
1180 self.channels.len()
1181 );
1182 Ok(())
1183 }
1184
1185 pub async fn poll(&mut self) -> Result<usize, String> {
1187 let mut total = 0;
1188
1189 for channel in &mut self.channels {
1190 let records = channel.read().await?;
1191 for record in records {
1192 let source = match channel.name() {
1193 "source" => SignalSource::Source,
1194 "file" => SignalSource::File,
1195 "topic" => SignalSource::Topic,
1196 _ => SignalSource::Api,
1197 };
1198
1199 match record.to_signal(source) {
1200 Ok(signal) => {
1201 let result = self.processor.process(signal).await;
1202 if result.is_success() {
1203 if let Err(e) = channel.acknowledge(&record.id).await {
1205 warn!("Failed to acknowledge signal {}: {}", record.id, e);
1206 }
1207 }
1208 total += 1;
1209 }
1210 Err(e) => {
1211 warn!("Failed to parse signal {}: {}", record.id, e);
1212 }
1213 }
1214 }
1215 }
1216
1217 Ok(total)
1218 }
1219
1220 pub async fn close(&mut self) -> Result<(), String> {
1222 self.running.store(false, Ordering::SeqCst);
1223 for channel in &mut self.channels {
1224 if let Err(e) = channel.close().await {
1225 warn!("Failed to close channel {}: {}", channel.name(), e);
1226 }
1227 }
1228 info!("Signal manager closed");
1229 Ok(())
1230 }
1231
1232 pub fn is_running(&self) -> bool {
1234 self.running.load(Ordering::SeqCst)
1235 }
1236
1237 pub fn config(&self) -> &SignalConfig {
1239 &self.config
1240 }
1241
1242 pub fn processor(&self) -> &Arc<SignalProcessor> {
1244 &self.processor
1245 }
1246
1247 pub fn create_source_channel(&self) -> Option<SourceSignalChannel> {
1249 if self.config.is_channel_enabled(SignalChannelType::Source) {
1250 self.config
1251 .signal_data_collection
1252 .as_ref()
1253 .map(SourceSignalChannel::new)
1254 } else {
1255 None
1256 }
1257 }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262 use super::*;
1263
1264 #[test]
1265 fn test_signal_action_str() {
1266 assert_eq!(SignalAction::ExecuteSnapshot.as_str(), "execute-snapshot");
1267 assert_eq!(SignalAction::StopSnapshot.as_str(), "stop-snapshot");
1268 assert_eq!(SignalAction::PauseSnapshot.as_str(), "pause-snapshot");
1269 assert_eq!(SignalAction::ResumeSnapshot.as_str(), "resume-snapshot");
1270 assert_eq!(SignalAction::Log.as_str(), "log");
1271 assert_eq!(
1272 SignalAction::Custom("my-action".to_string()).as_str(),
1273 "my-action"
1274 );
1275 }
1276
1277 #[test]
1278 fn test_signal_action_parse() {
1279 assert_eq!(
1280 SignalAction::parse("execute-snapshot"),
1281 SignalAction::ExecuteSnapshot
1282 );
1283 assert_eq!(
1284 SignalAction::parse("pause-snapshot"),
1285 SignalAction::PauseSnapshot
1286 );
1287 assert_eq!(
1288 SignalAction::parse("unknown"),
1289 SignalAction::Custom("unknown".to_string())
1290 );
1291 }
1292
1293 #[test]
1294 fn test_signal_data_empty() {
1295 let data = SignalData::empty();
1296 assert!(data.data_collections.is_empty());
1297 assert!(data.snapshot_type.is_none());
1298 assert!(data.properties.is_empty());
1299 }
1300
1301 #[test]
1302 fn test_signal_data_for_snapshot() {
1303 let data = SignalData::for_snapshot(
1304 vec!["public.users".to_string(), "public.orders".to_string()],
1305 "incremental",
1306 );
1307 assert_eq!(data.data_collections.len(), 2);
1308 assert_eq!(data.snapshot_type, Some("incremental".to_string()));
1309 }
1310
1311 #[test]
1312 fn test_signal_data_for_log() {
1313 let data = SignalData::for_log("Test message");
1314 assert_eq!(data.log_message(), Some("Test message"));
1315 }
1316
1317 #[test]
1318 fn test_signal_data_properties() {
1319 let data = SignalData::empty()
1320 .with_property("key1", serde_json::json!("value1"))
1321 .with_property("key2", serde_json::json!(42));
1322
1323 assert_eq!(
1324 data.get_property("key1"),
1325 Some(&serde_json::json!("value1"))
1326 );
1327 assert_eq!(data.get_property("key2"), Some(&serde_json::json!(42)));
1328 assert_eq!(data.get_property("key3"), None);
1329 }
1330
1331 #[test]
1332 fn test_signal_execute_snapshot() {
1333 let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
1334
1335 assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
1336 assert_eq!(signal.data.data_collections, vec!["public.users"]);
1337 assert_eq!(signal.data.snapshot_type, Some("incremental".to_string()));
1338 assert!(signal.is_snapshot_action());
1339 assert!(!signal.is_control_action());
1340 }
1341
1342 #[test]
1343 fn test_signal_blocking_snapshot() {
1344 let signal = Signal::blocking_snapshot(vec!["public.orders".to_string()]);
1345
1346 assert_eq!(signal.data.snapshot_type, Some("blocking".to_string()));
1347 }
1348
1349 #[test]
1350 fn test_signal_stop_snapshot() {
1351 let signal = Signal::stop_snapshot();
1352
1353 assert_eq!(signal.action, SignalAction::StopSnapshot);
1354 assert!(signal.is_snapshot_action());
1355 }
1356
1357 #[test]
1358 fn test_signal_pause() {
1359 let signal = Signal::pause();
1360
1361 assert_eq!(signal.action, SignalAction::PauseSnapshot);
1362 assert!(signal.is_control_action());
1363 assert!(!signal.is_snapshot_action());
1364 }
1365
1366 #[test]
1367 fn test_signal_resume() {
1368 let signal = Signal::resume();
1369
1370 assert_eq!(signal.action, SignalAction::ResumeSnapshot);
1371 assert!(signal.is_control_action());
1372 }
1373
1374 #[test]
1375 fn test_signal_log() {
1376 let signal = Signal::log("Hello, CDC!");
1377
1378 assert_eq!(signal.action, SignalAction::Log);
1379 assert_eq!(signal.data.log_message(), Some("Hello, CDC!"));
1380 }
1381
1382 #[test]
1383 fn test_signal_custom() {
1384 let data =
1385 SignalData::empty().with_property("custom_field", serde_json::json!("custom_value"));
1386 let signal = Signal::custom("my-custom-action", data);
1387
1388 assert_eq!(
1389 signal.action,
1390 SignalAction::Custom("my-custom-action".to_string())
1391 );
1392 }
1393
1394 #[test]
1395 fn test_signal_with_source() {
1396 let signal = Signal::pause().with_source(SignalSource::Topic);
1397 assert_eq!(signal.source, SignalSource::Topic);
1398 }
1399
1400 #[test]
1401 fn test_signal_result() {
1402 assert!(SignalResult::Success.is_success());
1403 assert!(SignalResult::Pending("waiting".to_string()).is_success());
1404 assert!(!SignalResult::Failed("error".to_string()).is_success());
1405 assert!(!SignalResult::Ignored("skipped".to_string()).is_success());
1406
1407 assert_eq!(
1408 SignalResult::Failed("error msg".to_string()).error_message(),
1409 Some("error msg")
1410 );
1411 assert_eq!(SignalResult::Success.error_message(), None);
1412 }
1413
1414 #[test]
1415 fn test_signal_stats() {
1416 let stats = SignalStats::default();
1417
1418 stats.record_received();
1419 stats.record_received();
1420 assert_eq!(stats.received(), 2);
1421
1422 stats.record_processed();
1423 assert_eq!(stats.processed(), 1);
1424
1425 stats.record_failed();
1426 assert_eq!(stats.failed(), 1);
1427
1428 stats.record_snapshot();
1429 stats.record_control();
1430 }
1431
1432 #[tokio::test]
1433 async fn test_signal_processor_new() {
1434 let processor = SignalProcessor::new();
1435
1436 assert!(!processor.is_paused());
1437 assert_eq!(processor.stats().received(), 0);
1438 }
1439
1440 #[tokio::test]
1441 async fn test_signal_processor_pause_resume() {
1442 let processor = SignalProcessor::new();
1443
1444 assert!(!processor.is_paused());
1445
1446 processor.pause();
1447 assert!(processor.is_paused());
1448
1449 processor.resume();
1450 assert!(!processor.is_paused());
1451 }
1452
1453 #[tokio::test]
1454 async fn test_signal_processor_process_pause() {
1455 let processor = SignalProcessor::new();
1456
1457 let result = processor.process(Signal::pause()).await;
1458
1459 assert!(result.is_success());
1460 assert!(processor.is_paused());
1461 }
1462
1463 #[tokio::test]
1464 async fn test_signal_processor_process_resume() {
1465 let processor = SignalProcessor::new();
1466 processor.pause();
1467
1468 let result = processor.process(Signal::resume()).await;
1469
1470 assert!(result.is_success());
1471 assert!(!processor.is_paused());
1472 }
1473
1474 #[tokio::test]
1475 async fn test_signal_processor_process_log() {
1476 let processor = SignalProcessor::new();
1477
1478 let result = processor.process(Signal::log("Test log")).await;
1479
1480 assert!(result.is_success());
1481 }
1482
1483 #[tokio::test]
1484 async fn test_signal_processor_custom_handler() {
1485 let processor = SignalProcessor::new();
1486
1487 processor
1488 .register_handler("custom-action", |_signal| async { SignalResult::Success })
1489 .await;
1490
1491 let signal = Signal::custom("custom-action", SignalData::empty());
1492 let result = processor.process(signal).await;
1493
1494 assert!(result.is_success());
1495 }
1496
1497 #[tokio::test]
1498 async fn test_signal_processor_source_filtering() {
1499 let processor = SignalProcessor::new();
1500 processor.set_enabled_sources(vec![SignalSource::Api]).await;
1501
1502 let api_signal = Signal::pause().with_source(SignalSource::Api);
1504 let result = processor.process(api_signal).await;
1505 assert!(result.is_success());
1506
1507 let topic_signal = Signal::pause().with_source(SignalSource::Topic);
1509 let result = processor.process(topic_signal).await;
1510 assert!(matches!(result, SignalResult::Ignored(_)));
1511 }
1512
1513 #[tokio::test]
1514 async fn test_signal_processor_snapshot_pending() {
1515 let processor = SignalProcessor::new();
1516
1517 let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
1518 let result = processor.process(signal).await;
1519
1520 assert!(matches!(result, SignalResult::Pending(_)));
1522 }
1523
1524 #[tokio::test]
1525 async fn test_signal_processor_stats() {
1526 let processor = SignalProcessor::new();
1527
1528 processor.process(Signal::log("msg1")).await;
1529 processor.process(Signal::pause()).await;
1530 processor.process(Signal::resume()).await;
1531
1532 assert_eq!(processor.stats().received(), 3);
1533 assert_eq!(processor.stats().processed(), 3);
1534 }
1535
1536 #[test]
1537 fn test_parse_from_row() {
1538 let signal = SignalProcessor::parse_from_row(
1539 "sig-1",
1540 "execute-snapshot",
1541 Some(r#"{"data-collections": ["public.users"]}"#),
1542 )
1543 .unwrap();
1544
1545 assert_eq!(signal.id, "sig-1");
1546 assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
1547 assert_eq!(signal.source, SignalSource::Source);
1548 }
1549
1550 #[test]
1551 fn test_parse_from_row_no_data() {
1552 let signal = SignalProcessor::parse_from_row("sig-2", "pause-snapshot", None).unwrap();
1553
1554 assert_eq!(signal.id, "sig-2");
1555 assert_eq!(signal.action, SignalAction::PauseSnapshot);
1556 }
1557
1558 #[test]
1559 fn test_parse_from_row_invalid_json() {
1560 let result = SignalProcessor::parse_from_row("sig-3", "log", Some("not valid json"));
1561
1562 assert!(result.is_err());
1563 }
1564
1565 #[tokio::test]
1566 async fn test_signal_channel() {
1567 let (channel, mut receiver) = SignalChannel::new(16);
1568
1569 channel.send(Signal::pause()).await.unwrap();
1570 channel.send(Signal::resume()).await.unwrap();
1571
1572 let sig1 = receiver.recv().await.unwrap();
1573 let sig2 = receiver.recv().await.unwrap();
1574
1575 assert_eq!(sig1.action, SignalAction::PauseSnapshot);
1576 assert_eq!(sig2.action, SignalAction::ResumeSnapshot);
1577 }
1578
1579 #[tokio::test]
1580 async fn test_signal_channel_try_send() {
1581 let (channel, _receiver) = SignalChannel::new(2);
1582
1583 assert!(channel.try_send(Signal::pause()).is_ok());
1584 assert!(channel.try_send(Signal::resume()).is_ok());
1585 assert!(channel.try_send(Signal::log("overflow")).is_err());
1587 }
1588
1589 #[test]
1590 fn test_signal_serialization() {
1591 let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
1592 let json = serde_json::to_string(&signal).unwrap();
1593
1594 assert!(json.contains("execute-snapshot"));
1595 assert!(json.contains("public.users"));
1596
1597 let parsed: Signal = serde_json::from_str(&json).unwrap();
1598 assert_eq!(parsed.action, SignalAction::ExecuteSnapshot);
1599 }
1600
1601 #[test]
1606 fn test_signal_channel_type_str() {
1607 assert_eq!(SignalChannelType::Source.as_str(), "source");
1608 assert_eq!(SignalChannelType::Topic.as_str(), "topic");
1609 assert_eq!(SignalChannelType::File.as_str(), "file");
1610 assert_eq!(SignalChannelType::Api.as_str(), "api");
1611 }
1612
1613 #[test]
1614 fn test_signal_channel_type_parse() {
1615 assert_eq!(
1616 SignalChannelType::parse("source"),
1617 Some(SignalChannelType::Source)
1618 );
1619 assert_eq!(
1620 SignalChannelType::parse("topic"),
1621 Some(SignalChannelType::Topic)
1622 );
1623 assert_eq!(
1624 SignalChannelType::parse("file"),
1625 Some(SignalChannelType::File)
1626 );
1627 assert_eq!(SignalChannelType::parse("unknown"), None);
1628 }
1629
1630 #[test]
1631 fn test_signal_config_default() {
1632 let config = SignalConfig::default();
1633
1634 assert!(config.is_channel_enabled(SignalChannelType::Source));
1635 assert!(config.is_channel_enabled(SignalChannelType::Topic));
1636 assert!(!config.is_channel_enabled(SignalChannelType::File));
1637 assert!(config.signal_data_collection.is_none());
1638 assert!(config.signal_topic.is_none());
1639 }
1640
1641 #[test]
1642 fn test_signal_config_builder() {
1643 let config = SignalConfig::builder()
1644 .enabled_channels(vec![SignalChannelType::Source, SignalChannelType::File])
1645 .signal_data_collection("public.rivven_signal")
1646 .signal_file("/tmp/signals.json")
1647 .signal_poll_interval_ms(500)
1648 .consumer_property("bootstrap.servers", "localhost:9092")
1649 .build();
1650
1651 assert!(config.is_channel_enabled(SignalChannelType::Source));
1652 assert!(config.is_channel_enabled(SignalChannelType::File));
1653 assert!(!config.is_channel_enabled(SignalChannelType::Topic));
1654 assert_eq!(
1655 config.signal_data_collection,
1656 Some("public.rivven_signal".to_string())
1657 );
1658 assert_eq!(config.signal_file, Some("/tmp/signals.json".to_string()));
1659 assert_eq!(config.signal_poll_interval_ms, 500);
1660 }
1661
1662 #[test]
1663 fn test_signal_config_table_name() {
1664 let config = SignalConfig::builder()
1665 .signal_data_collection("public.rivven_signal")
1666 .build();
1667
1668 assert_eq!(config.signal_table_name(), Some("rivven_signal"));
1669 assert_eq!(config.signal_schema_name(), Some("public"));
1670 }
1671
1672 #[test]
1673 fn test_signal_config_parse_channels() {
1674 let channels = SignalConfig::parse_enabled_channels("source, topic, file");
1675
1676 assert_eq!(channels.len(), 3);
1677 assert!(channels.contains(&SignalChannelType::Source));
1678 assert!(channels.contains(&SignalChannelType::Topic));
1679 assert!(channels.contains(&SignalChannelType::File));
1680 }
1681
1682 #[test]
1683 fn test_signal_record_to_signal() {
1684 let record = SignalRecord::new("sig-1", "execute-snapshot")
1685 .with_data(r#"{"data-collections": ["public.users"]}"#);
1686
1687 let signal = record.to_signal(SignalSource::Source).unwrap();
1688
1689 assert_eq!(signal.id, "sig-1");
1690 assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
1691 assert_eq!(signal.source, SignalSource::Source);
1692 assert_eq!(signal.data.data_collections, vec!["public.users"]);
1693 }
1694
1695 #[test]
1696 fn test_signal_record_to_signal_no_data() {
1697 let record = SignalRecord::new("sig-2", "pause-snapshot");
1698
1699 let signal = record.to_signal(SignalSource::File).unwrap();
1700
1701 assert_eq!(signal.id, "sig-2");
1702 assert_eq!(signal.action, SignalAction::PauseSnapshot);
1703 assert_eq!(signal.source, SignalSource::File);
1704 }
1705
1706 #[test]
1707 fn test_signal_record_invalid_json() {
1708 let record = SignalRecord::new("sig-3", "log").with_data("not valid json");
1709
1710 assert!(record.to_signal(SignalSource::Api).is_err());
1711 }
1712
1713 #[tokio::test]
1714 async fn test_source_signal_channel() {
1715 let mut channel = SourceSignalChannel::new("public.rivven_signal");
1716
1717 channel.init().await.unwrap();
1719 assert_eq!(channel.name(), "source");
1720
1721 let signals = channel.read().await.unwrap();
1723 assert!(signals.is_empty());
1724
1725 channel
1727 .handle_cdc_event(
1728 "sig-1",
1729 "execute-snapshot",
1730 Some(r#"{"data-collections": ["public.orders"]}"#),
1731 )
1732 .await
1733 .unwrap();
1734
1735 let signals = channel.read().await.unwrap();
1737 assert_eq!(signals.len(), 1);
1738 assert_eq!(signals[0].id, "sig-1");
1739 assert_eq!(signals[0].signal_type, "execute-snapshot");
1740
1741 let signals = channel.read().await.unwrap();
1743 assert!(signals.is_empty());
1744
1745 channel.close().await.unwrap();
1747 }
1748
1749 #[test]
1750 fn test_source_signal_channel_is_signal_event() {
1751 let channel = SourceSignalChannel::new("public.rivven_signal");
1752
1753 assert!(channel.is_signal_event("public", "rivven_signal"));
1754 assert!(!channel.is_signal_event("public", "users"));
1755 assert!(!channel.is_signal_event("other", "rivven_signal"));
1756 }
1757
1758 #[tokio::test]
1759 async fn test_file_signal_channel() {
1760 let temp_dir = std::env::temp_dir();
1762 let signal_file = temp_dir.join(format!("rivven_signals_{}.json", uuid::Uuid::new_v4()));
1763
1764 let content = r#"{"id":"sig-1","type":"execute-snapshot","data":{"data-collections":["public.users"]}}
1766{"id":"sig-2","type":"pause-snapshot"}
1767# This is a comment
1768{"id":"sig-3","type":"log","data":{"message":"Hello"}}"#;
1769 tokio::fs::write(&signal_file, content).await.unwrap();
1770
1771 let mut channel = FileSignalChannel::new(&signal_file);
1772
1773 channel.init().await.unwrap();
1775 assert_eq!(channel.name(), "file");
1776
1777 let signals = channel.read().await.unwrap();
1779 assert_eq!(signals.len(), 3);
1780 assert_eq!(signals[0].id, "sig-1");
1781 assert_eq!(signals[1].id, "sig-2");
1782 assert_eq!(signals[2].id, "sig-3");
1783
1784 let signals = channel.read().await.unwrap();
1786 assert!(signals.is_empty());
1787
1788 channel.close().await.unwrap();
1790 let _ = tokio::fs::remove_file(&signal_file).await;
1791 }
1792
1793 #[tokio::test]
1794 async fn test_signal_manager() {
1795 let config = SignalConfig::builder()
1796 .enabled_channels(vec![SignalChannelType::Source])
1797 .signal_data_collection("public.rivven_signal")
1798 .build();
1799
1800 let processor = Arc::new(SignalProcessor::new());
1801 let mut manager = SignalManager::new(config, processor.clone());
1802
1803 let source_channel = manager.create_source_channel().unwrap();
1805 let pending = source_channel.pending_signals();
1806 manager.add_channel(Box::new(source_channel));
1807
1808 manager.init().await.unwrap();
1810 assert!(manager.is_running());
1811
1812 pending
1814 .write()
1815 .await
1816 .push(SignalRecord::new("sig-1", "pause-snapshot"));
1817
1818 let count = manager.poll().await.unwrap();
1820 assert_eq!(count, 1);
1821
1822 assert!(processor.is_paused());
1824
1825 manager.close().await.unwrap();
1827 assert!(!manager.is_running());
1828 }
1829}