Skip to main content

rivven_cdc/common/
signal.rs

1//! # CDC Signaling
2//!
3//! Multi-channel control system for CDC connectors.
4//!
5//! ## Features
6//!
7//! - **Ad-hoc Snapshots**: Trigger snapshots on demand
8//! - **Pause/Resume**: Control streaming without restarting
9//! - **Incremental Snapshots**: Chunk-based table re-snapshots
10//! - **Custom Signals**: Application-defined signal handlers
11//! - **Multi-Channel**: Source table, Topic, File, API channels
12//!
13//! ## Signal Channels
14//!
15//! Rivven supports multiple signal channels:
16//!
17//! | Channel | Description | Use Case |
18//! |---------|-------------|----------|
19//! | `source` | Signal table captured via CDC stream | Default, required for incremental snapshots |
20//! | `topic` | Signals from a Rivven topic | Avoids database writes |
21//! | `file` | Signals from a JSON file | Simple deployments |
22//! | `api` | HTTP/gRPC API calls | Programmatic control |
23//!
24//! The **source channel is enabled by default** because it implements the watermarking
25//! mechanism for incremental snapshot deduplication. Signals flow through the CDC stream,
26//! so no separate database connection is required.
27//!
28//! ## Signal Table Schema
29//!
30//! ```sql
31//! CREATE TABLE rivven_signal (
32//!     id VARCHAR(42) PRIMARY KEY,
33//!     type VARCHAR(32) NOT NULL,
34//!     data VARCHAR(2048) NULL
35//! );
36//! ```
37//!
38//! ## Configuration
39//!
40//! ```rust,ignore
41//! use rivven_cdc::common::signal::{SignalConfig, SignalChannel as ChannelType};
42//!
43//! let config = SignalConfig::builder()
44//!     .enabled_channels(vec![ChannelType::Source, ChannelType::Topic])
45//!     .signal_data_collection("public.rivven_signal")  // Source channel table
46//!     .signal_topic("cdc-signals")                     // Topic channel
47//!     .build();
48//! ```
49//!
50//! ## Usage
51//!
52//! ```rust,ignore
53//! use rivven_cdc::common::signal::{Signal, SignalProcessor, SignalAction};
54//!
55//! let processor = SignalProcessor::new();
56//!
57//! // Register custom handler
58//! processor.register_handler("custom-action", |signal| {
59//!     println!("Custom signal: {:?}", signal);
60//!     Ok(())
61//! });
62//!
63//! // Process a signal
64//! let signal = Signal::execute_snapshot(vec!["public.orders".to_string()]);
65//! processor.process(signal).await?;
66//! ```
67
68use serde::{Deserialize, Serialize};
69use std::collections::HashMap;
70use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
71use std::sync::Arc;
72use tokio::sync::RwLock;
73use tracing::{debug, info, warn};
74use uuid::Uuid;
75
76/// Signal action types.
77#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78#[serde(rename_all = "kebab-case")]
79pub enum SignalAction {
80    /// Execute a snapshot for specified tables
81    ExecuteSnapshot,
82    /// Stop an in-progress snapshot
83    StopSnapshot,
84    /// Pause streaming
85    PauseSnapshot,
86    /// Resume streaming
87    ResumeSnapshot,
88    /// Log a message (diagnostic)
89    Log,
90    /// Custom action
91    Custom(String),
92}
93
94impl SignalAction {
95    /// Get the action name as string.
96    pub fn as_str(&self) -> &str {
97        match self {
98            SignalAction::ExecuteSnapshot => "execute-snapshot",
99            SignalAction::StopSnapshot => "stop-snapshot",
100            SignalAction::PauseSnapshot => "pause-snapshot",
101            SignalAction::ResumeSnapshot => "resume-snapshot",
102            SignalAction::Log => "log",
103            SignalAction::Custom(name) => name,
104        }
105    }
106
107    /// Parse action from string.
108    pub fn parse(s: &str) -> Self {
109        match s {
110            "execute-snapshot" => SignalAction::ExecuteSnapshot,
111            "stop-snapshot" => SignalAction::StopSnapshot,
112            "pause-snapshot" => SignalAction::PauseSnapshot,
113            "resume-snapshot" => SignalAction::ResumeSnapshot,
114            "log" => SignalAction::Log,
115            other => SignalAction::Custom(other.to_string()),
116        }
117    }
118}
119
120/// Signal data payload.
121#[derive(Debug, Clone, Default, Serialize, Deserialize)]
122pub struct SignalData {
123    /// Tables to snapshot (for snapshot actions)
124    #[serde(default, rename = "data-collections")]
125    pub data_collections: Vec<String>,
126    /// Snapshot type (blocking, incremental)
127    #[serde(default, rename = "type")]
128    pub snapshot_type: Option<String>,
129    /// Additional properties
130    #[serde(default, flatten)]
131    pub properties: HashMap<String, serde_json::Value>,
132}
133
134impl SignalData {
135    /// Create empty signal data.
136    pub fn empty() -> Self {
137        Self {
138            data_collections: Vec::new(),
139            snapshot_type: None,
140            properties: HashMap::new(),
141        }
142    }
143
144    /// Create signal data for snapshot.
145    pub fn for_snapshot(tables: Vec<String>, snapshot_type: &str) -> Self {
146        Self {
147            data_collections: tables,
148            snapshot_type: Some(snapshot_type.to_string()),
149            properties: HashMap::new(),
150        }
151    }
152
153    /// Create signal data for log message.
154    pub fn for_log(message: &str) -> Self {
155        let mut properties = HashMap::new();
156        properties.insert(
157            "message".to_string(),
158            serde_json::Value::String(message.to_string()),
159        );
160        Self {
161            data_collections: Vec::new(),
162            snapshot_type: None,
163            properties,
164        }
165    }
166
167    /// Add a property.
168    pub fn with_property(mut self, key: &str, value: serde_json::Value) -> Self {
169        self.properties.insert(key.to_string(), value);
170        self
171    }
172
173    /// Get a property value.
174    pub fn get_property(&self, key: &str) -> Option<&serde_json::Value> {
175        self.properties.get(key)
176    }
177
178    /// Get log message if present.
179    pub fn log_message(&self) -> Option<&str> {
180        self.properties.get("message")?.as_str()
181    }
182}
183
184/// A CDC signal.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct Signal {
187    /// Unique signal identifier
188    pub id: String,
189    /// Signal action type
190    #[serde(rename = "type")]
191    pub action: SignalAction,
192    /// Signal data/payload
193    #[serde(default)]
194    pub data: SignalData,
195    /// Timestamp when signal was created
196    #[serde(default = "default_timestamp")]
197    pub timestamp: i64,
198    /// Source of the signal
199    #[serde(default)]
200    pub source: SignalSource,
201}
202
203fn default_timestamp() -> i64 {
204    chrono::Utc::now().timestamp_millis()
205}
206
207impl Signal {
208    /// Create a new signal.
209    pub fn new(id: impl Into<String>, action: SignalAction, data: SignalData) -> Self {
210        Self {
211            id: id.into(),
212            action,
213            data,
214            timestamp: chrono::Utc::now().timestamp_millis(),
215            source: SignalSource::Api,
216        }
217    }
218
219    /// Create an execute-snapshot signal.
220    pub fn execute_snapshot(tables: Vec<String>) -> Self {
221        Self::new(
222            Uuid::new_v4().to_string(),
223            SignalAction::ExecuteSnapshot,
224            SignalData::for_snapshot(tables, "incremental"),
225        )
226    }
227
228    /// Create a blocking snapshot signal.
229    pub fn blocking_snapshot(tables: Vec<String>) -> Self {
230        Self::new(
231            Uuid::new_v4().to_string(),
232            SignalAction::ExecuteSnapshot,
233            SignalData::for_snapshot(tables, "blocking"),
234        )
235    }
236
237    /// Create a stop-snapshot signal.
238    pub fn stop_snapshot() -> Self {
239        Self::new(
240            Uuid::new_v4().to_string(),
241            SignalAction::StopSnapshot,
242            SignalData::empty(),
243        )
244    }
245
246    /// Create a pause signal.
247    pub fn pause() -> Self {
248        Self::new(
249            Uuid::new_v4().to_string(),
250            SignalAction::PauseSnapshot,
251            SignalData::empty(),
252        )
253    }
254
255    /// Create a resume signal.
256    pub fn resume() -> Self {
257        Self::new(
258            Uuid::new_v4().to_string(),
259            SignalAction::ResumeSnapshot,
260            SignalData::empty(),
261        )
262    }
263
264    /// Create a log signal.
265    pub fn log(message: &str) -> Self {
266        Self::new(
267            Uuid::new_v4().to_string(),
268            SignalAction::Log,
269            SignalData::for_log(message),
270        )
271    }
272
273    /// Create a custom signal.
274    pub fn custom(action: &str, data: SignalData) -> Self {
275        Self::new(
276            Uuid::new_v4().to_string(),
277            SignalAction::Custom(action.to_string()),
278            data,
279        )
280    }
281
282    /// Set signal source.
283    pub fn with_source(mut self, source: SignalSource) -> Self {
284        self.source = source;
285        self
286    }
287
288    /// Check if this is a snapshot action.
289    pub fn is_snapshot_action(&self) -> bool {
290        matches!(
291            self.action,
292            SignalAction::ExecuteSnapshot | SignalAction::StopSnapshot
293        )
294    }
295
296    /// Check if this is a pause/resume action.
297    pub fn is_control_action(&self) -> bool {
298        matches!(
299            self.action,
300            SignalAction::PauseSnapshot | SignalAction::ResumeSnapshot
301        )
302    }
303}
304
305/// Source of the signal.
306#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
307#[serde(rename_all = "lowercase")]
308pub enum SignalSource {
309    /// Signal from API call
310    #[default]
311    Api,
312    /// Signal from database table
313    Source,
314    /// Signal from Rivven topic
315    Topic,
316    /// Signal from file
317    File,
318}
319
320/// Result of processing a signal.
321#[derive(Debug, Clone)]
322pub enum SignalResult {
323    /// Signal processed successfully
324    Success,
325    /// Signal acknowledged but pending
326    Pending(String),
327    /// Signal ignored (not applicable)
328    Ignored(String),
329    /// Signal failed
330    Failed(String),
331}
332
333impl SignalResult {
334    /// Check if successful.
335    pub fn is_success(&self) -> bool {
336        matches!(self, SignalResult::Success | SignalResult::Pending(_))
337    }
338
339    /// Get error message if failed.
340    pub fn error_message(&self) -> Option<&str> {
341        match self {
342            SignalResult::Failed(msg) => Some(msg),
343            _ => None,
344        }
345    }
346}
347
348/// Signal handler trait.
349pub trait SignalHandler: Send + Sync {
350    /// Handle a signal.
351    fn handle(&self, signal: &Signal) -> impl std::future::Future<Output = SignalResult> + Send;
352
353    /// Get supported action types.
354    fn supported_actions(&self) -> Vec<SignalAction>;
355}
356
357/// Signal processing statistics.
358#[derive(Debug, Default)]
359pub struct SignalStats {
360    /// Total signals received
361    signals_received: AtomicU64,
362    /// Signals processed successfully
363    signals_processed: AtomicU64,
364    /// Signals that failed
365    signals_failed: AtomicU64,
366    /// Signals ignored
367    signals_ignored: AtomicU64,
368    /// Snapshot signals
369    snapshot_signals: AtomicU64,
370    /// Control signals (pause/resume)
371    control_signals: AtomicU64,
372}
373
374impl SignalStats {
375    /// Record signal received.
376    pub fn record_received(&self) {
377        self.signals_received.fetch_add(1, Ordering::Relaxed);
378    }
379
380    /// Record signal processed.
381    pub fn record_processed(&self) {
382        self.signals_processed.fetch_add(1, Ordering::Relaxed);
383    }
384
385    /// Record signal failed.
386    pub fn record_failed(&self) {
387        self.signals_failed.fetch_add(1, Ordering::Relaxed);
388    }
389
390    /// Record signal ignored.
391    pub fn record_ignored(&self) {
392        self.signals_ignored.fetch_add(1, Ordering::Relaxed);
393    }
394
395    /// Record snapshot signal.
396    pub fn record_snapshot(&self) {
397        self.snapshot_signals.fetch_add(1, Ordering::Relaxed);
398    }
399
400    /// Record control signal.
401    pub fn record_control(&self) {
402        self.control_signals.fetch_add(1, Ordering::Relaxed);
403    }
404
405    /// Get total received.
406    pub fn received(&self) -> u64 {
407        self.signals_received.load(Ordering::Relaxed)
408    }
409
410    /// Get total processed.
411    pub fn processed(&self) -> u64 {
412        self.signals_processed.load(Ordering::Relaxed)
413    }
414
415    /// Get total failed.
416    pub fn failed(&self) -> u64 {
417        self.signals_failed.load(Ordering::Relaxed)
418    }
419}
420
421/// Boxed async handler function.
422type BoxedHandler = Box<
423    dyn Fn(&Signal) -> std::pin::Pin<Box<dyn std::future::Future<Output = SignalResult> + Send>>
424        + Send
425        + Sync,
426>;
427
428/// Signal processor for handling CDC signals.
429pub struct SignalProcessor {
430    /// Custom handlers keyed by action name
431    handlers: RwLock<HashMap<String, BoxedHandler>>,
432    /// Statistics
433    stats: Arc<SignalStats>,
434    /// Whether processing is paused
435    paused: AtomicBool,
436    /// Enabled signal sources
437    enabled_sources: RwLock<Vec<SignalSource>>,
438}
439
440impl Default for SignalProcessor {
441    fn default() -> Self {
442        Self::new()
443    }
444}
445
446impl SignalProcessor {
447    /// Create a new signal processor.
448    pub fn new() -> Self {
449        Self {
450            handlers: RwLock::new(HashMap::new()),
451            stats: Arc::new(SignalStats::default()),
452            paused: AtomicBool::new(false),
453            enabled_sources: RwLock::new(vec![SignalSource::Api, SignalSource::Source]),
454        }
455    }
456
457    /// Get statistics.
458    pub fn stats(&self) -> &Arc<SignalStats> {
459        &self.stats
460    }
461
462    /// Check if processing is paused.
463    pub fn is_paused(&self) -> bool {
464        self.paused.load(Ordering::Relaxed)
465    }
466
467    /// Pause processing.
468    pub fn pause(&self) {
469        self.paused.store(true, Ordering::Relaxed);
470        info!("Signal processor paused");
471    }
472
473    /// Resume processing.
474    pub fn resume(&self) {
475        self.paused.store(false, Ordering::Relaxed);
476        info!("Signal processor resumed");
477    }
478
479    /// Register a custom handler for an action.
480    pub async fn register_handler<F, Fut>(&self, action: &str, handler: F)
481    where
482        F: Fn(&Signal) -> Fut + Send + Sync + 'static,
483        Fut: std::future::Future<Output = SignalResult> + Send + 'static,
484    {
485        let boxed: BoxedHandler = Box::new(move |signal| Box::pin(handler(signal)));
486        self.handlers
487            .write()
488            .await
489            .insert(action.to_string(), boxed);
490        debug!("Registered handler for action: {}", action);
491    }
492
493    /// Set enabled signal sources.
494    pub async fn set_enabled_sources(&self, sources: Vec<SignalSource>) {
495        *self.enabled_sources.write().await = sources;
496    }
497
498    /// Check if signal source is enabled.
499    pub async fn is_source_enabled(&self, source: &SignalSource) -> bool {
500        self.enabled_sources.read().await.contains(source)
501    }
502
503    /// Process a signal.
504    pub async fn process(&self, signal: Signal) -> SignalResult {
505        self.stats.record_received();
506
507        // Check if source is enabled
508        if !self.is_source_enabled(&signal.source).await {
509            debug!(
510                "Signal source {:?} not enabled, ignoring: {}",
511                signal.source, signal.id
512            );
513            self.stats.record_ignored();
514            return SignalResult::Ignored(format!("Source {:?} not enabled", signal.source));
515        }
516
517        info!(
518            "Processing signal: id={}, action={:?}, source={:?}",
519            signal.id, signal.action, signal.source
520        );
521
522        // Track signal types
523        if signal.is_snapshot_action() {
524            self.stats.record_snapshot();
525        }
526        if signal.is_control_action() {
527            self.stats.record_control();
528        }
529
530        // Handle built-in control actions
531        let result = match &signal.action {
532            SignalAction::PauseSnapshot => {
533                self.pause();
534                SignalResult::Success
535            }
536            SignalAction::ResumeSnapshot => {
537                self.resume();
538                SignalResult::Success
539            }
540            SignalAction::Log => {
541                if let Some(msg) = signal.data.log_message() {
542                    info!("Signal log message: {}", msg);
543                }
544                SignalResult::Success
545            }
546            _ => {
547                // Check for custom handler
548                let handlers = self.handlers.read().await;
549                if let Some(handler) = handlers.get(signal.action.as_str()) {
550                    handler(&signal).await
551                } else {
552                    // No handler - return pending for snapshot actions
553                    if signal.is_snapshot_action() {
554                        SignalResult::Pending(format!(
555                            "Snapshot signal {} queued for processing",
556                            signal.id
557                        ))
558                    } else {
559                        warn!("No handler for action: {:?}", signal.action);
560                        SignalResult::Ignored(format!("No handler for action: {:?}", signal.action))
561                    }
562                }
563            }
564        };
565
566        // Update stats based on result
567        match &result {
568            SignalResult::Success | SignalResult::Pending(_) => {
569                self.stats.record_processed();
570            }
571            SignalResult::Failed(_) => {
572                self.stats.record_failed();
573            }
574            SignalResult::Ignored(_) => {
575                self.stats.record_ignored();
576            }
577        }
578
579        result
580    }
581
582    /// Parse signal from table row.
583    pub fn parse_from_row(
584        id: &str,
585        signal_type: &str,
586        data: Option<&str>,
587    ) -> Result<Signal, String> {
588        let action = SignalAction::parse(signal_type);
589
590        let signal_data = if let Some(data_str) = data {
591            serde_json::from_str(data_str)
592                .map_err(|e| format!("Failed to parse signal data: {}", e))?
593        } else {
594            SignalData::empty()
595        };
596
597        Ok(Signal::new(id, action, signal_data).with_source(SignalSource::Source))
598    }
599}
600
601/// Signal channel for communication between components.
602#[derive(Clone)]
603pub struct SignalChannel {
604    sender: tokio::sync::mpsc::Sender<Signal>,
605}
606
607impl SignalChannel {
608    /// Create a new signal channel.
609    pub fn new(buffer_size: usize) -> (Self, tokio::sync::mpsc::Receiver<Signal>) {
610        let (sender, receiver) = tokio::sync::mpsc::channel(buffer_size);
611        (Self { sender }, receiver)
612    }
613
614    /// Send a signal.
615    pub async fn send(&self, signal: Signal) -> Result<(), String> {
616        self.sender
617            .send(signal)
618            .await
619            .map_err(|e| format!("Failed to send signal: {}", e))
620    }
621
622    /// Try to send a signal without blocking.
623    pub fn try_send(&self, signal: Signal) -> Result<(), String> {
624        self.sender
625            .try_send(signal)
626            .map_err(|e| format!("Failed to send signal: {}", e))
627    }
628}
629
630// ============================================================================
631// Signal Channel Configuration
632// ============================================================================
633
634/// Signal channel types.
635#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
636#[serde(rename_all = "lowercase")]
637#[derive(Default)]
638pub enum SignalChannelType {
639    /// Source channel - signal table captured via CDC stream (default)
640    #[default]
641    Source,
642    /// Topic channel - signals from Rivven topic
643    Topic,
644    /// File channel - signals from a JSON file
645    File,
646    /// API channel - signals from HTTP/gRPC
647    Api,
648}
649
650impl SignalChannelType {
651    /// Get the channel name as string.
652    pub fn as_str(&self) -> &'static str {
653        match self {
654            SignalChannelType::Source => "source",
655            SignalChannelType::Topic => "topic",
656            SignalChannelType::File => "file",
657            SignalChannelType::Api => "api",
658        }
659    }
660
661    /// Parse from string.
662    pub fn parse(s: &str) -> Option<Self> {
663        match s.to_lowercase().as_str() {
664            "source" => Some(SignalChannelType::Source),
665            "topic" => Some(SignalChannelType::Topic),
666            "file" => Some(SignalChannelType::File),
667            "api" => Some(SignalChannelType::Api),
668            _ => None,
669        }
670    }
671}
672
673/// Configuration for CDC signaling.
674#[derive(Debug, Clone, Serialize, Deserialize)]
675pub struct SignalConfig {
676    /// Enabled signal channels (default: source, topic)
677    #[serde(default = "default_enabled_channels")]
678    pub enabled_channels: Vec<SignalChannelType>,
679
680    /// Fully-qualified name of signal data collection (for source channel)
681    /// Format: schema.table (PostgreSQL) or database.table (MySQL)
682    /// Default: None (disabled)
683    #[serde(default)]
684    pub signal_data_collection: Option<String>,
685
686    /// Topic for signal messages (for topic channel)
687    /// Default: None (disabled)
688    #[serde(default)]
689    pub signal_topic: Option<String>,
690
691    /// Path to signal file (for file channel)
692    /// Default: None (disabled)
693    #[serde(default)]
694    pub signal_file: Option<String>,
695
696    /// Poll interval for file channel
697    #[serde(default = "default_poll_interval_ms")]
698    pub signal_poll_interval_ms: u64,
699
700    /// Consumer properties for topic channel
701    #[serde(default)]
702    pub signal_consumer_properties: HashMap<String, String>,
703}
704
705fn default_enabled_channels() -> Vec<SignalChannelType> {
706    vec![SignalChannelType::Source, SignalChannelType::Topic]
707}
708
709fn default_poll_interval_ms() -> u64 {
710    1000 // 1 second
711}
712
713impl Default for SignalConfig {
714    fn default() -> Self {
715        Self {
716            enabled_channels: default_enabled_channels(),
717            signal_data_collection: None,
718            signal_topic: None,
719            signal_file: None,
720            signal_poll_interval_ms: default_poll_interval_ms(),
721            signal_consumer_properties: HashMap::new(),
722        }
723    }
724}
725
726impl SignalConfig {
727    /// Create a new builder.
728    pub fn builder() -> SignalConfigBuilder {
729        SignalConfigBuilder::default()
730    }
731
732    /// Check if a channel is enabled.
733    pub fn is_channel_enabled(&self, channel: SignalChannelType) -> bool {
734        self.enabled_channels.contains(&channel)
735    }
736
737    /// Get the signal table name (without schema).
738    pub fn signal_table_name(&self) -> Option<&str> {
739        self.signal_data_collection
740            .as_ref()
741            .and_then(|s| s.split('.').next_back())
742    }
743
744    /// Get the signal schema name.
745    pub fn signal_schema_name(&self) -> Option<&str> {
746        self.signal_data_collection.as_ref().and_then(|s| {
747            let parts: Vec<&str> = s.split('.').collect();
748            if parts.len() >= 2 {
749                Some(parts[0])
750            } else {
751                None
752            }
753        })
754    }
755
756    /// Parse enabled channels from comma-separated string.
757    pub fn parse_enabled_channels(s: &str) -> Vec<SignalChannelType> {
758        s.split(',')
759            .filter_map(|c| SignalChannelType::parse(c.trim()))
760            .collect()
761    }
762}
763
764/// Builder for SignalConfig.
765#[derive(Debug, Default)]
766pub struct SignalConfigBuilder {
767    enabled_channels: Option<Vec<SignalChannelType>>,
768    signal_data_collection: Option<String>,
769    signal_topic: Option<String>,
770    signal_file: Option<String>,
771    signal_poll_interval_ms: Option<u64>,
772    signal_consumer_properties: HashMap<String, String>,
773}
774
775impl SignalConfigBuilder {
776    /// Set enabled channels.
777    pub fn enabled_channels(mut self, channels: Vec<SignalChannelType>) -> Self {
778        self.enabled_channels = Some(channels);
779        self
780    }
781
782    /// Enable specific channel.
783    pub fn enable_channel(mut self, channel: SignalChannelType) -> Self {
784        self.enabled_channels
785            .get_or_insert_with(Vec::new)
786            .push(channel);
787        self
788    }
789
790    /// Set signal data collection (table name).
791    pub fn signal_data_collection(mut self, collection: impl Into<String>) -> Self {
792        self.signal_data_collection = Some(collection.into());
793        self
794    }
795
796    /// Set signal topic.
797    pub fn signal_topic(mut self, topic: impl Into<String>) -> Self {
798        self.signal_topic = Some(topic.into());
799        self
800    }
801
802    /// Set signal file path.
803    pub fn signal_file(mut self, path: impl Into<String>) -> Self {
804        self.signal_file = Some(path.into());
805        self
806    }
807
808    /// Set poll interval for file channel.
809    pub fn signal_poll_interval_ms(mut self, ms: u64) -> Self {
810        self.signal_poll_interval_ms = Some(ms);
811        self
812    }
813
814    /// Add consumer property for topic channel.
815    pub fn consumer_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
816        self.signal_consumer_properties
817            .insert(key.into(), value.into());
818        self
819    }
820
821    /// Build the configuration.
822    pub fn build(self) -> SignalConfig {
823        SignalConfig {
824            enabled_channels: self
825                .enabled_channels
826                .unwrap_or_else(default_enabled_channels),
827            signal_data_collection: self.signal_data_collection,
828            signal_topic: self.signal_topic,
829            signal_file: self.signal_file,
830            signal_poll_interval_ms: self
831                .signal_poll_interval_ms
832                .unwrap_or_else(default_poll_interval_ms),
833            signal_consumer_properties: self.signal_consumer_properties,
834        }
835    }
836}
837
838// ============================================================================
839// Signal Channel Reader Trait
840// ============================================================================
841
842/// Signal record from a channel - minimal data needed.
843#[derive(Debug, Clone)]
844pub struct SignalRecord {
845    /// Signal ID
846    pub id: String,
847    /// Signal type (action)
848    pub signal_type: String,
849    /// Signal data (JSON string)
850    pub data: Option<String>,
851    /// Source offset (for acknowledgment)
852    pub offset: Option<String>,
853}
854
855impl SignalRecord {
856    /// Create a new signal record.
857    pub fn new(id: impl Into<String>, signal_type: impl Into<String>) -> Self {
858        Self {
859            id: id.into(),
860            signal_type: signal_type.into(),
861            data: None,
862            offset: None,
863        }
864    }
865
866    /// Set data.
867    pub fn with_data(mut self, data: impl Into<String>) -> Self {
868        self.data = Some(data.into());
869        self
870    }
871
872    /// Set offset.
873    pub fn with_offset(mut self, offset: impl Into<String>) -> Self {
874        self.offset = Some(offset.into());
875        self
876    }
877
878    /// Convert to Signal.
879    pub fn to_signal(&self, source: SignalSource) -> Result<Signal, String> {
880        let action = SignalAction::parse(&self.signal_type);
881        let signal_data = if let Some(data_str) = &self.data {
882            serde_json::from_str(data_str)
883                .map_err(|e| format!("Failed to parse signal data: {}", e))?
884        } else {
885            SignalData::empty()
886        };
887        Ok(Signal::new(&self.id, action, signal_data).with_source(source))
888    }
889}
890
891/// Trait for reading signals from a channel.
892///
893/// Implementations provide different signal sources (database, topic, file, etc.).
894#[async_trait::async_trait]
895pub trait SignalChannelReader: Send + Sync {
896    /// Get the channel name.
897    fn name(&self) -> &str;
898
899    /// Initialize the channel reader.
900    async fn init(&mut self) -> Result<(), String>;
901
902    /// Read available signals. Returns empty vec if none available.
903    async fn read(&mut self) -> Result<Vec<SignalRecord>, String>;
904
905    /// Acknowledge a signal (optional - for channels that track consumption).
906    async fn acknowledge(&mut self, _signal_id: &str) -> Result<(), String> {
907        Ok(()) // Default: no-op
908    }
909
910    /// Close the channel reader.
911    async fn close(&mut self) -> Result<(), String>;
912}
913
914// ============================================================================
915// Source Signal Channel (CDC Stream)
916// ============================================================================
917
918/// Source signal channel - detects signals from CDC stream.
919///
920/// This channel watches for changes to the signal table through the normal
921/// CDC replication stream. No separate database connection is required.
922///
923/// When a row is inserted into the signal table:
924/// 1. The INSERT flows through logical replication like any other change
925/// 2. The CDC connector detects it's from the signal table
926/// 3. The signal is extracted and processed
927///
928/// This is the default and recommended channel because:
929/// - Signal ordering is guaranteed (part of the change stream)
930/// - Watermarking for incremental snapshots works correctly
931/// - No additional connections or polling required
932/// - Works with read replicas (signals replicate like other data)
933pub struct SourceSignalChannel {
934    /// Signal table name (fully qualified)
935    signal_table: String,
936    /// Pending signals detected from CDC stream
937    pending: Arc<RwLock<Vec<SignalRecord>>>,
938    /// Whether the channel is initialized
939    initialized: bool,
940}
941
942impl SourceSignalChannel {
943    /// Create a new source signal channel.
944    pub fn new(signal_table: impl Into<String>) -> Self {
945        Self {
946            signal_table: signal_table.into(),
947            pending: Arc::new(RwLock::new(Vec::new())),
948            initialized: false,
949        }
950    }
951
952    /// Get a reference to pending signals for CDC integration.
953    pub fn pending_signals(&self) -> Arc<RwLock<Vec<SignalRecord>>> {
954        Arc::clone(&self.pending)
955    }
956
957    /// Check if a CDC event is from the signal table.
958    pub fn is_signal_event(&self, schema: &str, table: &str) -> bool {
959        let expected = format!("{}.{}", schema, table);
960        self.signal_table == expected || self.signal_table == table
961    }
962
963    /// Extract signal from CDC event (called by CDC connector).
964    pub async fn handle_cdc_event(
965        &self,
966        id: &str,
967        signal_type: &str,
968        data: Option<&str>,
969    ) -> Result<(), String> {
970        let record = SignalRecord {
971            id: id.to_string(),
972            signal_type: signal_type.to_string(),
973            data: data.map(|s| s.to_string()),
974            offset: None,
975        };
976        self.pending.write().await.push(record);
977        debug!(
978            "Source channel: detected signal {} of type {}",
979            id, signal_type
980        );
981        Ok(())
982    }
983}
984
985#[async_trait::async_trait]
986impl SignalChannelReader for SourceSignalChannel {
987    fn name(&self) -> &str {
988        "source"
989    }
990
991    async fn init(&mut self) -> Result<(), String> {
992        info!(
993            "Source signal channel initialized for table: {}",
994            self.signal_table
995        );
996        self.initialized = true;
997        Ok(())
998    }
999
1000    async fn read(&mut self) -> Result<Vec<SignalRecord>, String> {
1001        // Drain pending signals detected from CDC stream
1002        let mut pending = self.pending.write().await;
1003        let signals = std::mem::take(&mut *pending);
1004        if !signals.is_empty() {
1005            debug!("Source channel: returning {} signals", signals.len());
1006        }
1007        Ok(signals)
1008    }
1009
1010    async fn close(&mut self) -> Result<(), String> {
1011        info!("Source signal channel closed");
1012        self.initialized = false;
1013        Ok(())
1014    }
1015}
1016
1017// ============================================================================
1018// File Signal Channel
1019// ============================================================================
1020
1021/// File signal channel - reads signals from a JSON file.
1022///
1023/// The file should contain JSON signal objects, one per line:
1024/// ```json
1025/// {"id":"sig-1","type":"execute-snapshot","data":{"data-collections":["public.users"]}}
1026/// ```
1027///
1028/// Processed signals are tracked to avoid reprocessing.
1029pub struct FileSignalChannel {
1030    /// Path to signal file
1031    path: std::path::PathBuf,
1032    /// Processed signal IDs
1033    processed: std::collections::HashSet<String>,
1034    /// Last file modification time
1035    last_modified: Option<std::time::SystemTime>,
1036    /// Whether initialized
1037    initialized: bool,
1038}
1039
1040impl FileSignalChannel {
1041    /// Create a new file signal channel.
1042    pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
1043        Self {
1044            path: path.into(),
1045            processed: std::collections::HashSet::new(),
1046            last_modified: None,
1047            initialized: false,
1048        }
1049    }
1050}
1051
1052#[async_trait::async_trait]
1053impl SignalChannelReader for FileSignalChannel {
1054    fn name(&self) -> &str {
1055        "file"
1056    }
1057
1058    async fn init(&mut self) -> Result<(), String> {
1059        if !self.path.exists() {
1060            // Create empty file if it doesn't exist
1061            tokio::fs::write(&self.path, "")
1062                .await
1063                .map_err(|e| format!("Failed to create signal file: {}", e))?;
1064        }
1065        info!("File signal channel initialized: {:?}", self.path);
1066        self.initialized = true;
1067        Ok(())
1068    }
1069
1070    async fn read(&mut self) -> Result<Vec<SignalRecord>, String> {
1071        // Check if file has been modified
1072        let metadata = tokio::fs::metadata(&self.path)
1073            .await
1074            .map_err(|e| format!("Failed to read signal file metadata: {}", e))?;
1075
1076        let modified = metadata
1077            .modified()
1078            .map_err(|e| format!("Failed to get file modification time: {}", e))?;
1079
1080        if self.last_modified == Some(modified) {
1081            return Ok(Vec::new()); // No changes
1082        }
1083        self.last_modified = Some(modified);
1084
1085        // Read and parse file
1086        let content = tokio::fs::read_to_string(&self.path)
1087            .await
1088            .map_err(|e| format!("Failed to read signal file: {}", e))?;
1089
1090        let mut signals = Vec::new();
1091        for line in content.lines() {
1092            let line = line.trim();
1093            if line.is_empty() || line.starts_with('#') {
1094                continue;
1095            }
1096
1097            // Parse JSON line
1098            #[derive(Deserialize)]
1099            struct FileSignal {
1100                id: String,
1101                #[serde(rename = "type")]
1102                signal_type: String,
1103                data: Option<serde_json::Value>,
1104            }
1105
1106            match serde_json::from_str::<FileSignal>(line) {
1107                Ok(fs) => {
1108                    if !self.processed.contains(&fs.id) {
1109                        let record = SignalRecord {
1110                            id: fs.id.clone(),
1111                            signal_type: fs.signal_type,
1112                            data: fs.data.map(|v| v.to_string()),
1113                            offset: None,
1114                        };
1115                        signals.push(record);
1116                        self.processed.insert(fs.id);
1117                    }
1118                }
1119                Err(e) => {
1120                    warn!("Failed to parse signal line: {} - {}", line, e);
1121                }
1122            }
1123        }
1124
1125        if !signals.is_empty() {
1126            debug!("File channel: read {} new signals", signals.len());
1127        }
1128
1129        Ok(signals)
1130    }
1131
1132    async fn close(&mut self) -> Result<(), String> {
1133        info!("File signal channel closed");
1134        self.initialized = false;
1135        Ok(())
1136    }
1137}
1138
1139// ============================================================================
1140// Multi-Channel Signal Manager
1141// ============================================================================
1142
1143/// Manages multiple signal channels.
1144pub struct SignalManager {
1145    /// Active channel readers
1146    channels: Vec<Box<dyn SignalChannelReader>>,
1147    /// Signal processor
1148    processor: Arc<SignalProcessor>,
1149    /// Configuration
1150    config: SignalConfig,
1151    /// Running flag
1152    running: Arc<AtomicBool>,
1153}
1154
1155impl SignalManager {
1156    /// Create a new signal manager.
1157    pub fn new(config: SignalConfig, processor: Arc<SignalProcessor>) -> Self {
1158        Self {
1159            channels: Vec::new(),
1160            processor,
1161            config,
1162            running: Arc::new(AtomicBool::new(false)),
1163        }
1164    }
1165
1166    /// Add a channel reader.
1167    pub fn add_channel(&mut self, channel: Box<dyn SignalChannelReader>) {
1168        info!("Adding signal channel: {}", channel.name());
1169        self.channels.push(channel);
1170    }
1171
1172    /// Initialize all channels.
1173    pub async fn init(&mut self) -> Result<(), String> {
1174        for channel in &mut self.channels {
1175            channel.init().await?;
1176        }
1177        self.running.store(true, Ordering::SeqCst);
1178        info!(
1179            "Signal manager initialized with {} channels",
1180            self.channels.len()
1181        );
1182        Ok(())
1183    }
1184
1185    /// Poll all channels and process signals.
1186    pub async fn poll(&mut self) -> Result<usize, String> {
1187        let mut total = 0;
1188
1189        for channel in &mut self.channels {
1190            let records = channel.read().await?;
1191            for record in records {
1192                let source = match channel.name() {
1193                    "source" => SignalSource::Source,
1194                    "file" => SignalSource::File,
1195                    "topic" => SignalSource::Topic,
1196                    _ => SignalSource::Api,
1197                };
1198
1199                match record.to_signal(source) {
1200                    Ok(signal) => {
1201                        let result = self.processor.process(signal).await;
1202                        if result.is_success() {
1203                            // Acknowledge successful processing
1204                            if let Err(e) = channel.acknowledge(&record.id).await {
1205                                warn!("Failed to acknowledge signal {}: {}", record.id, e);
1206                            }
1207                        }
1208                        total += 1;
1209                    }
1210                    Err(e) => {
1211                        warn!("Failed to parse signal {}: {}", record.id, e);
1212                    }
1213                }
1214            }
1215        }
1216
1217        Ok(total)
1218    }
1219
1220    /// Close all channels.
1221    pub async fn close(&mut self) -> Result<(), String> {
1222        self.running.store(false, Ordering::SeqCst);
1223        for channel in &mut self.channels {
1224            if let Err(e) = channel.close().await {
1225                warn!("Failed to close channel {}: {}", channel.name(), e);
1226            }
1227        }
1228        info!("Signal manager closed");
1229        Ok(())
1230    }
1231
1232    /// Check if running.
1233    pub fn is_running(&self) -> bool {
1234        self.running.load(Ordering::SeqCst)
1235    }
1236
1237    /// Get config reference.
1238    pub fn config(&self) -> &SignalConfig {
1239        &self.config
1240    }
1241
1242    /// Get processor reference.
1243    pub fn processor(&self) -> &Arc<SignalProcessor> {
1244        &self.processor
1245    }
1246
1247    /// Create a source channel for CDC integration.
1248    pub fn create_source_channel(&self) -> Option<SourceSignalChannel> {
1249        if self.config.is_channel_enabled(SignalChannelType::Source) {
1250            self.config
1251                .signal_data_collection
1252                .as_ref()
1253                .map(SourceSignalChannel::new)
1254        } else {
1255            None
1256        }
1257    }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263
1264    #[test]
1265    fn test_signal_action_str() {
1266        assert_eq!(SignalAction::ExecuteSnapshot.as_str(), "execute-snapshot");
1267        assert_eq!(SignalAction::StopSnapshot.as_str(), "stop-snapshot");
1268        assert_eq!(SignalAction::PauseSnapshot.as_str(), "pause-snapshot");
1269        assert_eq!(SignalAction::ResumeSnapshot.as_str(), "resume-snapshot");
1270        assert_eq!(SignalAction::Log.as_str(), "log");
1271        assert_eq!(
1272            SignalAction::Custom("my-action".to_string()).as_str(),
1273            "my-action"
1274        );
1275    }
1276
1277    #[test]
1278    fn test_signal_action_parse() {
1279        assert_eq!(
1280            SignalAction::parse("execute-snapshot"),
1281            SignalAction::ExecuteSnapshot
1282        );
1283        assert_eq!(
1284            SignalAction::parse("pause-snapshot"),
1285            SignalAction::PauseSnapshot
1286        );
1287        assert_eq!(
1288            SignalAction::parse("unknown"),
1289            SignalAction::Custom("unknown".to_string())
1290        );
1291    }
1292
1293    #[test]
1294    fn test_signal_data_empty() {
1295        let data = SignalData::empty();
1296        assert!(data.data_collections.is_empty());
1297        assert!(data.snapshot_type.is_none());
1298        assert!(data.properties.is_empty());
1299    }
1300
1301    #[test]
1302    fn test_signal_data_for_snapshot() {
1303        let data = SignalData::for_snapshot(
1304            vec!["public.users".to_string(), "public.orders".to_string()],
1305            "incremental",
1306        );
1307        assert_eq!(data.data_collections.len(), 2);
1308        assert_eq!(data.snapshot_type, Some("incremental".to_string()));
1309    }
1310
1311    #[test]
1312    fn test_signal_data_for_log() {
1313        let data = SignalData::for_log("Test message");
1314        assert_eq!(data.log_message(), Some("Test message"));
1315    }
1316
1317    #[test]
1318    fn test_signal_data_properties() {
1319        let data = SignalData::empty()
1320            .with_property("key1", serde_json::json!("value1"))
1321            .with_property("key2", serde_json::json!(42));
1322
1323        assert_eq!(
1324            data.get_property("key1"),
1325            Some(&serde_json::json!("value1"))
1326        );
1327        assert_eq!(data.get_property("key2"), Some(&serde_json::json!(42)));
1328        assert_eq!(data.get_property("key3"), None);
1329    }
1330
1331    #[test]
1332    fn test_signal_execute_snapshot() {
1333        let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
1334
1335        assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
1336        assert_eq!(signal.data.data_collections, vec!["public.users"]);
1337        assert_eq!(signal.data.snapshot_type, Some("incremental".to_string()));
1338        assert!(signal.is_snapshot_action());
1339        assert!(!signal.is_control_action());
1340    }
1341
1342    #[test]
1343    fn test_signal_blocking_snapshot() {
1344        let signal = Signal::blocking_snapshot(vec!["public.orders".to_string()]);
1345
1346        assert_eq!(signal.data.snapshot_type, Some("blocking".to_string()));
1347    }
1348
1349    #[test]
1350    fn test_signal_stop_snapshot() {
1351        let signal = Signal::stop_snapshot();
1352
1353        assert_eq!(signal.action, SignalAction::StopSnapshot);
1354        assert!(signal.is_snapshot_action());
1355    }
1356
1357    #[test]
1358    fn test_signal_pause() {
1359        let signal = Signal::pause();
1360
1361        assert_eq!(signal.action, SignalAction::PauseSnapshot);
1362        assert!(signal.is_control_action());
1363        assert!(!signal.is_snapshot_action());
1364    }
1365
1366    #[test]
1367    fn test_signal_resume() {
1368        let signal = Signal::resume();
1369
1370        assert_eq!(signal.action, SignalAction::ResumeSnapshot);
1371        assert!(signal.is_control_action());
1372    }
1373
1374    #[test]
1375    fn test_signal_log() {
1376        let signal = Signal::log("Hello, CDC!");
1377
1378        assert_eq!(signal.action, SignalAction::Log);
1379        assert_eq!(signal.data.log_message(), Some("Hello, CDC!"));
1380    }
1381
1382    #[test]
1383    fn test_signal_custom() {
1384        let data =
1385            SignalData::empty().with_property("custom_field", serde_json::json!("custom_value"));
1386        let signal = Signal::custom("my-custom-action", data);
1387
1388        assert_eq!(
1389            signal.action,
1390            SignalAction::Custom("my-custom-action".to_string())
1391        );
1392    }
1393
1394    #[test]
1395    fn test_signal_with_source() {
1396        let signal = Signal::pause().with_source(SignalSource::Topic);
1397        assert_eq!(signal.source, SignalSource::Topic);
1398    }
1399
1400    #[test]
1401    fn test_signal_result() {
1402        assert!(SignalResult::Success.is_success());
1403        assert!(SignalResult::Pending("waiting".to_string()).is_success());
1404        assert!(!SignalResult::Failed("error".to_string()).is_success());
1405        assert!(!SignalResult::Ignored("skipped".to_string()).is_success());
1406
1407        assert_eq!(
1408            SignalResult::Failed("error msg".to_string()).error_message(),
1409            Some("error msg")
1410        );
1411        assert_eq!(SignalResult::Success.error_message(), None);
1412    }
1413
1414    #[test]
1415    fn test_signal_stats() {
1416        let stats = SignalStats::default();
1417
1418        stats.record_received();
1419        stats.record_received();
1420        assert_eq!(stats.received(), 2);
1421
1422        stats.record_processed();
1423        assert_eq!(stats.processed(), 1);
1424
1425        stats.record_failed();
1426        assert_eq!(stats.failed(), 1);
1427
1428        stats.record_snapshot();
1429        stats.record_control();
1430    }
1431
1432    #[tokio::test]
1433    async fn test_signal_processor_new() {
1434        let processor = SignalProcessor::new();
1435
1436        assert!(!processor.is_paused());
1437        assert_eq!(processor.stats().received(), 0);
1438    }
1439
1440    #[tokio::test]
1441    async fn test_signal_processor_pause_resume() {
1442        let processor = SignalProcessor::new();
1443
1444        assert!(!processor.is_paused());
1445
1446        processor.pause();
1447        assert!(processor.is_paused());
1448
1449        processor.resume();
1450        assert!(!processor.is_paused());
1451    }
1452
1453    #[tokio::test]
1454    async fn test_signal_processor_process_pause() {
1455        let processor = SignalProcessor::new();
1456
1457        let result = processor.process(Signal::pause()).await;
1458
1459        assert!(result.is_success());
1460        assert!(processor.is_paused());
1461    }
1462
1463    #[tokio::test]
1464    async fn test_signal_processor_process_resume() {
1465        let processor = SignalProcessor::new();
1466        processor.pause();
1467
1468        let result = processor.process(Signal::resume()).await;
1469
1470        assert!(result.is_success());
1471        assert!(!processor.is_paused());
1472    }
1473
1474    #[tokio::test]
1475    async fn test_signal_processor_process_log() {
1476        let processor = SignalProcessor::new();
1477
1478        let result = processor.process(Signal::log("Test log")).await;
1479
1480        assert!(result.is_success());
1481    }
1482
1483    #[tokio::test]
1484    async fn test_signal_processor_custom_handler() {
1485        let processor = SignalProcessor::new();
1486
1487        processor
1488            .register_handler("custom-action", |_signal| async { SignalResult::Success })
1489            .await;
1490
1491        let signal = Signal::custom("custom-action", SignalData::empty());
1492        let result = processor.process(signal).await;
1493
1494        assert!(result.is_success());
1495    }
1496
1497    #[tokio::test]
1498    async fn test_signal_processor_source_filtering() {
1499        let processor = SignalProcessor::new();
1500        processor.set_enabled_sources(vec![SignalSource::Api]).await;
1501
1502        // API source should work
1503        let api_signal = Signal::pause().with_source(SignalSource::Api);
1504        let result = processor.process(api_signal).await;
1505        assert!(result.is_success());
1506
1507        // Topic source should be ignored
1508        let topic_signal = Signal::pause().with_source(SignalSource::Topic);
1509        let result = processor.process(topic_signal).await;
1510        assert!(matches!(result, SignalResult::Ignored(_)));
1511    }
1512
1513    #[tokio::test]
1514    async fn test_signal_processor_snapshot_pending() {
1515        let processor = SignalProcessor::new();
1516
1517        let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
1518        let result = processor.process(signal).await;
1519
1520        // Without handler, snapshot signals should be pending
1521        assert!(matches!(result, SignalResult::Pending(_)));
1522    }
1523
1524    #[tokio::test]
1525    async fn test_signal_processor_stats() {
1526        let processor = SignalProcessor::new();
1527
1528        processor.process(Signal::log("msg1")).await;
1529        processor.process(Signal::pause()).await;
1530        processor.process(Signal::resume()).await;
1531
1532        assert_eq!(processor.stats().received(), 3);
1533        assert_eq!(processor.stats().processed(), 3);
1534    }
1535
1536    #[test]
1537    fn test_parse_from_row() {
1538        let signal = SignalProcessor::parse_from_row(
1539            "sig-1",
1540            "execute-snapshot",
1541            Some(r#"{"data-collections": ["public.users"]}"#),
1542        )
1543        .unwrap();
1544
1545        assert_eq!(signal.id, "sig-1");
1546        assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
1547        assert_eq!(signal.source, SignalSource::Source);
1548    }
1549
1550    #[test]
1551    fn test_parse_from_row_no_data() {
1552        let signal = SignalProcessor::parse_from_row("sig-2", "pause-snapshot", None).unwrap();
1553
1554        assert_eq!(signal.id, "sig-2");
1555        assert_eq!(signal.action, SignalAction::PauseSnapshot);
1556    }
1557
1558    #[test]
1559    fn test_parse_from_row_invalid_json() {
1560        let result = SignalProcessor::parse_from_row("sig-3", "log", Some("not valid json"));
1561
1562        assert!(result.is_err());
1563    }
1564
1565    #[tokio::test]
1566    async fn test_signal_channel() {
1567        let (channel, mut receiver) = SignalChannel::new(16);
1568
1569        channel.send(Signal::pause()).await.unwrap();
1570        channel.send(Signal::resume()).await.unwrap();
1571
1572        let sig1 = receiver.recv().await.unwrap();
1573        let sig2 = receiver.recv().await.unwrap();
1574
1575        assert_eq!(sig1.action, SignalAction::PauseSnapshot);
1576        assert_eq!(sig2.action, SignalAction::ResumeSnapshot);
1577    }
1578
1579    #[tokio::test]
1580    async fn test_signal_channel_try_send() {
1581        let (channel, _receiver) = SignalChannel::new(2);
1582
1583        assert!(channel.try_send(Signal::pause()).is_ok());
1584        assert!(channel.try_send(Signal::resume()).is_ok());
1585        // Buffer full
1586        assert!(channel.try_send(Signal::log("overflow")).is_err());
1587    }
1588
1589    #[test]
1590    fn test_signal_serialization() {
1591        let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
1592        let json = serde_json::to_string(&signal).unwrap();
1593
1594        assert!(json.contains("execute-snapshot"));
1595        assert!(json.contains("public.users"));
1596
1597        let parsed: Signal = serde_json::from_str(&json).unwrap();
1598        assert_eq!(parsed.action, SignalAction::ExecuteSnapshot);
1599    }
1600
1601    // ========================================================================
1602    // Signal Channel Tests
1603    // ========================================================================
1604
1605    #[test]
1606    fn test_signal_channel_type_str() {
1607        assert_eq!(SignalChannelType::Source.as_str(), "source");
1608        assert_eq!(SignalChannelType::Topic.as_str(), "topic");
1609        assert_eq!(SignalChannelType::File.as_str(), "file");
1610        assert_eq!(SignalChannelType::Api.as_str(), "api");
1611    }
1612
1613    #[test]
1614    fn test_signal_channel_type_parse() {
1615        assert_eq!(
1616            SignalChannelType::parse("source"),
1617            Some(SignalChannelType::Source)
1618        );
1619        assert_eq!(
1620            SignalChannelType::parse("topic"),
1621            Some(SignalChannelType::Topic)
1622        );
1623        assert_eq!(
1624            SignalChannelType::parse("file"),
1625            Some(SignalChannelType::File)
1626        );
1627        assert_eq!(SignalChannelType::parse("unknown"), None);
1628    }
1629
1630    #[test]
1631    fn test_signal_config_default() {
1632        let config = SignalConfig::default();
1633
1634        assert!(config.is_channel_enabled(SignalChannelType::Source));
1635        assert!(config.is_channel_enabled(SignalChannelType::Topic));
1636        assert!(!config.is_channel_enabled(SignalChannelType::File));
1637        assert!(config.signal_data_collection.is_none());
1638        assert!(config.signal_topic.is_none());
1639    }
1640
1641    #[test]
1642    fn test_signal_config_builder() {
1643        let config = SignalConfig::builder()
1644            .enabled_channels(vec![SignalChannelType::Source, SignalChannelType::File])
1645            .signal_data_collection("public.rivven_signal")
1646            .signal_file("/tmp/signals.json")
1647            .signal_poll_interval_ms(500)
1648            .consumer_property("bootstrap.servers", "localhost:9092")
1649            .build();
1650
1651        assert!(config.is_channel_enabled(SignalChannelType::Source));
1652        assert!(config.is_channel_enabled(SignalChannelType::File));
1653        assert!(!config.is_channel_enabled(SignalChannelType::Topic));
1654        assert_eq!(
1655            config.signal_data_collection,
1656            Some("public.rivven_signal".to_string())
1657        );
1658        assert_eq!(config.signal_file, Some("/tmp/signals.json".to_string()));
1659        assert_eq!(config.signal_poll_interval_ms, 500);
1660    }
1661
1662    #[test]
1663    fn test_signal_config_table_name() {
1664        let config = SignalConfig::builder()
1665            .signal_data_collection("public.rivven_signal")
1666            .build();
1667
1668        assert_eq!(config.signal_table_name(), Some("rivven_signal"));
1669        assert_eq!(config.signal_schema_name(), Some("public"));
1670    }
1671
1672    #[test]
1673    fn test_signal_config_parse_channels() {
1674        let channels = SignalConfig::parse_enabled_channels("source, topic, file");
1675
1676        assert_eq!(channels.len(), 3);
1677        assert!(channels.contains(&SignalChannelType::Source));
1678        assert!(channels.contains(&SignalChannelType::Topic));
1679        assert!(channels.contains(&SignalChannelType::File));
1680    }
1681
1682    #[test]
1683    fn test_signal_record_to_signal() {
1684        let record = SignalRecord::new("sig-1", "execute-snapshot")
1685            .with_data(r#"{"data-collections": ["public.users"]}"#);
1686
1687        let signal = record.to_signal(SignalSource::Source).unwrap();
1688
1689        assert_eq!(signal.id, "sig-1");
1690        assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
1691        assert_eq!(signal.source, SignalSource::Source);
1692        assert_eq!(signal.data.data_collections, vec!["public.users"]);
1693    }
1694
1695    #[test]
1696    fn test_signal_record_to_signal_no_data() {
1697        let record = SignalRecord::new("sig-2", "pause-snapshot");
1698
1699        let signal = record.to_signal(SignalSource::File).unwrap();
1700
1701        assert_eq!(signal.id, "sig-2");
1702        assert_eq!(signal.action, SignalAction::PauseSnapshot);
1703        assert_eq!(signal.source, SignalSource::File);
1704    }
1705
1706    #[test]
1707    fn test_signal_record_invalid_json() {
1708        let record = SignalRecord::new("sig-3", "log").with_data("not valid json");
1709
1710        assert!(record.to_signal(SignalSource::Api).is_err());
1711    }
1712
1713    #[tokio::test]
1714    async fn test_source_signal_channel() {
1715        let mut channel = SourceSignalChannel::new("public.rivven_signal");
1716
1717        // Initialize
1718        channel.init().await.unwrap();
1719        assert_eq!(channel.name(), "source");
1720
1721        // No signals initially
1722        let signals = channel.read().await.unwrap();
1723        assert!(signals.is_empty());
1724
1725        // Simulate CDC event
1726        channel
1727            .handle_cdc_event(
1728                "sig-1",
1729                "execute-snapshot",
1730                Some(r#"{"data-collections": ["public.orders"]}"#),
1731            )
1732            .await
1733            .unwrap();
1734
1735        // Read signal
1736        let signals = channel.read().await.unwrap();
1737        assert_eq!(signals.len(), 1);
1738        assert_eq!(signals[0].id, "sig-1");
1739        assert_eq!(signals[0].signal_type, "execute-snapshot");
1740
1741        // Should be empty after read
1742        let signals = channel.read().await.unwrap();
1743        assert!(signals.is_empty());
1744
1745        // Close
1746        channel.close().await.unwrap();
1747    }
1748
1749    #[test]
1750    fn test_source_signal_channel_is_signal_event() {
1751        let channel = SourceSignalChannel::new("public.rivven_signal");
1752
1753        assert!(channel.is_signal_event("public", "rivven_signal"));
1754        assert!(!channel.is_signal_event("public", "users"));
1755        assert!(!channel.is_signal_event("other", "rivven_signal"));
1756    }
1757
1758    #[tokio::test]
1759    async fn test_file_signal_channel() {
1760        // Create a temporary file
1761        let temp_dir = std::env::temp_dir();
1762        let signal_file = temp_dir.join(format!("rivven_signals_{}.json", uuid::Uuid::new_v4()));
1763
1764        // Write test signals
1765        let content = r#"{"id":"sig-1","type":"execute-snapshot","data":{"data-collections":["public.users"]}}
1766{"id":"sig-2","type":"pause-snapshot"}
1767# This is a comment
1768{"id":"sig-3","type":"log","data":{"message":"Hello"}}"#;
1769        tokio::fs::write(&signal_file, content).await.unwrap();
1770
1771        let mut channel = FileSignalChannel::new(&signal_file);
1772
1773        // Initialize
1774        channel.init().await.unwrap();
1775        assert_eq!(channel.name(), "file");
1776
1777        // Read signals
1778        let signals = channel.read().await.unwrap();
1779        assert_eq!(signals.len(), 3);
1780        assert_eq!(signals[0].id, "sig-1");
1781        assert_eq!(signals[1].id, "sig-2");
1782        assert_eq!(signals[2].id, "sig-3");
1783
1784        // Second read should return empty (already processed)
1785        let signals = channel.read().await.unwrap();
1786        assert!(signals.is_empty());
1787
1788        // Close and cleanup
1789        channel.close().await.unwrap();
1790        let _ = tokio::fs::remove_file(&signal_file).await;
1791    }
1792
1793    #[tokio::test]
1794    async fn test_signal_manager() {
1795        let config = SignalConfig::builder()
1796            .enabled_channels(vec![SignalChannelType::Source])
1797            .signal_data_collection("public.rivven_signal")
1798            .build();
1799
1800        let processor = Arc::new(SignalProcessor::new());
1801        let mut manager = SignalManager::new(config, processor.clone());
1802
1803        // Create and add source channel
1804        let source_channel = manager.create_source_channel().unwrap();
1805        let pending = source_channel.pending_signals();
1806        manager.add_channel(Box::new(source_channel));
1807
1808        // Initialize
1809        manager.init().await.unwrap();
1810        assert!(manager.is_running());
1811
1812        // Simulate CDC signal
1813        pending
1814            .write()
1815            .await
1816            .push(SignalRecord::new("sig-1", "pause-snapshot"));
1817
1818        // Poll and process
1819        let count = manager.poll().await.unwrap();
1820        assert_eq!(count, 1);
1821
1822        // Processor should have received the signal
1823        assert!(processor.is_paused());
1824
1825        // Close
1826        manager.close().await.unwrap();
1827        assert!(!manager.is_running());
1828    }
1829}