use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum SignalAction {
ExecuteSnapshot,
StopSnapshot,
PauseSnapshot,
ResumeSnapshot,
Log,
Custom(String),
}
impl SignalAction {
pub fn as_str(&self) -> &str {
match self {
SignalAction::ExecuteSnapshot => "execute-snapshot",
SignalAction::StopSnapshot => "stop-snapshot",
SignalAction::PauseSnapshot => "pause-snapshot",
SignalAction::ResumeSnapshot => "resume-snapshot",
SignalAction::Log => "log",
SignalAction::Custom(name) => name,
}
}
pub fn parse(s: &str) -> Self {
match s {
"execute-snapshot" => SignalAction::ExecuteSnapshot,
"stop-snapshot" => SignalAction::StopSnapshot,
"pause-snapshot" => SignalAction::PauseSnapshot,
"resume-snapshot" => SignalAction::ResumeSnapshot,
"log" => SignalAction::Log,
other => SignalAction::Custom(other.to_string()),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SignalData {
#[serde(default, rename = "data-collections")]
pub data_collections: Vec<String>,
#[serde(default, rename = "type")]
pub snapshot_type: Option<String>,
#[serde(default, flatten)]
pub properties: HashMap<String, serde_json::Value>,
}
impl SignalData {
pub fn empty() -> Self {
Self {
data_collections: Vec::new(),
snapshot_type: None,
properties: HashMap::new(),
}
}
pub fn for_snapshot(tables: Vec<String>, snapshot_type: &str) -> Self {
Self {
data_collections: tables,
snapshot_type: Some(snapshot_type.to_string()),
properties: HashMap::new(),
}
}
pub fn for_log(message: &str) -> Self {
let mut properties = HashMap::new();
properties.insert(
"message".to_string(),
serde_json::Value::String(message.to_string()),
);
Self {
data_collections: Vec::new(),
snapshot_type: None,
properties,
}
}
pub fn with_property(mut self, key: &str, value: serde_json::Value) -> Self {
self.properties.insert(key.to_string(), value);
self
}
pub fn get_property(&self, key: &str) -> Option<&serde_json::Value> {
self.properties.get(key)
}
pub fn log_message(&self) -> Option<&str> {
self.properties.get("message")?.as_str()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Signal {
pub id: String,
#[serde(rename = "type")]
pub action: SignalAction,
#[serde(default)]
pub data: SignalData,
#[serde(default = "default_timestamp")]
pub timestamp: i64,
#[serde(default)]
pub source: SignalSource,
}
fn default_timestamp() -> i64 {
chrono::Utc::now().timestamp_millis()
}
impl Signal {
pub fn new(id: impl Into<String>, action: SignalAction, data: SignalData) -> Self {
Self {
id: id.into(),
action,
data,
timestamp: chrono::Utc::now().timestamp_millis(),
source: SignalSource::Api,
}
}
pub fn execute_snapshot(tables: Vec<String>) -> Self {
Self::new(
Uuid::new_v4().to_string(),
SignalAction::ExecuteSnapshot,
SignalData::for_snapshot(tables, "incremental"),
)
}
pub fn blocking_snapshot(tables: Vec<String>) -> Self {
Self::new(
Uuid::new_v4().to_string(),
SignalAction::ExecuteSnapshot,
SignalData::for_snapshot(tables, "blocking"),
)
}
pub fn stop_snapshot() -> Self {
Self::new(
Uuid::new_v4().to_string(),
SignalAction::StopSnapshot,
SignalData::empty(),
)
}
pub fn pause() -> Self {
Self::new(
Uuid::new_v4().to_string(),
SignalAction::PauseSnapshot,
SignalData::empty(),
)
}
pub fn resume() -> Self {
Self::new(
Uuid::new_v4().to_string(),
SignalAction::ResumeSnapshot,
SignalData::empty(),
)
}
pub fn log(message: &str) -> Self {
Self::new(
Uuid::new_v4().to_string(),
SignalAction::Log,
SignalData::for_log(message),
)
}
pub fn custom(action: &str, data: SignalData) -> Self {
Self::new(
Uuid::new_v4().to_string(),
SignalAction::Custom(action.to_string()),
data,
)
}
pub fn with_source(mut self, source: SignalSource) -> Self {
self.source = source;
self
}
pub fn is_snapshot_action(&self) -> bool {
matches!(
self.action,
SignalAction::ExecuteSnapshot | SignalAction::StopSnapshot
)
}
pub fn is_control_action(&self) -> bool {
matches!(
self.action,
SignalAction::PauseSnapshot | SignalAction::ResumeSnapshot
)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum SignalSource {
#[default]
Api,
Source,
Kafka,
File,
Jmx,
}
#[derive(Debug, Clone)]
pub enum SignalResult {
Success,
Pending(String),
Ignored(String),
Failed(String),
}
impl SignalResult {
pub fn is_success(&self) -> bool {
matches!(self, SignalResult::Success | SignalResult::Pending(_))
}
pub fn error_message(&self) -> Option<&str> {
match self {
SignalResult::Failed(msg) => Some(msg),
_ => None,
}
}
}
pub trait SignalHandler: Send + Sync {
fn handle(&self, signal: &Signal) -> impl std::future::Future<Output = SignalResult> + Send;
fn supported_actions(&self) -> Vec<SignalAction>;
}
#[derive(Debug, Default)]
pub struct SignalStats {
signals_received: AtomicU64,
signals_processed: AtomicU64,
signals_failed: AtomicU64,
signals_ignored: AtomicU64,
snapshot_signals: AtomicU64,
control_signals: AtomicU64,
}
impl SignalStats {
pub fn record_received(&self) {
self.signals_received.fetch_add(1, Ordering::Relaxed);
}
pub fn record_processed(&self) {
self.signals_processed.fetch_add(1, Ordering::Relaxed);
}
pub fn record_failed(&self) {
self.signals_failed.fetch_add(1, Ordering::Relaxed);
}
pub fn record_ignored(&self) {
self.signals_ignored.fetch_add(1, Ordering::Relaxed);
}
pub fn record_snapshot(&self) {
self.snapshot_signals.fetch_add(1, Ordering::Relaxed);
}
pub fn record_control(&self) {
self.control_signals.fetch_add(1, Ordering::Relaxed);
}
pub fn received(&self) -> u64 {
self.signals_received.load(Ordering::Relaxed)
}
pub fn processed(&self) -> u64 {
self.signals_processed.load(Ordering::Relaxed)
}
pub fn failed(&self) -> u64 {
self.signals_failed.load(Ordering::Relaxed)
}
}
type BoxedHandler = Box<
dyn Fn(&Signal) -> std::pin::Pin<Box<dyn std::future::Future<Output = SignalResult> + Send>>
+ Send
+ Sync,
>;
pub struct SignalProcessor {
handlers: RwLock<HashMap<String, BoxedHandler>>,
stats: Arc<SignalStats>,
paused: AtomicBool,
enabled_sources: RwLock<Vec<SignalSource>>,
}
impl Default for SignalProcessor {
fn default() -> Self {
Self::new()
}
}
impl SignalProcessor {
pub fn new() -> Self {
Self {
handlers: RwLock::new(HashMap::new()),
stats: Arc::new(SignalStats::default()),
paused: AtomicBool::new(false),
enabled_sources: RwLock::new(vec![SignalSource::Api, SignalSource::Source]),
}
}
pub fn stats(&self) -> &Arc<SignalStats> {
&self.stats
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Relaxed)
}
pub fn pause(&self) {
self.paused.store(true, Ordering::Relaxed);
info!("Signal processor paused");
}
pub fn resume(&self) {
self.paused.store(false, Ordering::Relaxed);
info!("Signal processor resumed");
}
pub async fn register_handler<F, Fut>(&self, action: &str, handler: F)
where
F: Fn(&Signal) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = SignalResult> + Send + 'static,
{
let boxed: BoxedHandler = Box::new(move |signal| Box::pin(handler(signal)));
self.handlers
.write()
.await
.insert(action.to_string(), boxed);
debug!("Registered handler for action: {}", action);
}
pub async fn set_enabled_sources(&self, sources: Vec<SignalSource>) {
*self.enabled_sources.write().await = sources;
}
pub async fn is_source_enabled(&self, source: &SignalSource) -> bool {
self.enabled_sources.read().await.contains(source)
}
pub async fn process(&self, signal: Signal) -> SignalResult {
self.stats.record_received();
if !self.is_source_enabled(&signal.source).await {
debug!(
"Signal source {:?} not enabled, ignoring: {}",
signal.source, signal.id
);
self.stats.record_ignored();
return SignalResult::Ignored(format!("Source {:?} not enabled", signal.source));
}
info!(
"Processing signal: id={}, action={:?}, source={:?}",
signal.id, signal.action, signal.source
);
if signal.is_snapshot_action() {
self.stats.record_snapshot();
}
if signal.is_control_action() {
self.stats.record_control();
}
let result = match &signal.action {
SignalAction::PauseSnapshot => {
self.pause();
SignalResult::Success
}
SignalAction::ResumeSnapshot => {
self.resume();
SignalResult::Success
}
SignalAction::Log => {
if let Some(msg) = signal.data.log_message() {
info!("Signal log message: {}", msg);
}
SignalResult::Success
}
_ => {
let handlers = self.handlers.read().await;
if let Some(handler) = handlers.get(signal.action.as_str()) {
handler(&signal).await
} else {
if signal.is_snapshot_action() {
SignalResult::Pending(format!(
"Snapshot signal {} queued for processing",
signal.id
))
} else {
warn!("No handler for action: {:?}", signal.action);
SignalResult::Ignored(format!("No handler for action: {:?}", signal.action))
}
}
}
};
match &result {
SignalResult::Success | SignalResult::Pending(_) => {
self.stats.record_processed();
}
SignalResult::Failed(_) => {
self.stats.record_failed();
}
SignalResult::Ignored(_) => {
self.stats.record_ignored();
}
}
result
}
pub fn parse_from_row(
id: &str,
signal_type: &str,
data: Option<&str>,
) -> Result<Signal, String> {
let action = SignalAction::parse(signal_type);
let signal_data = if let Some(data_str) = data {
serde_json::from_str(data_str)
.map_err(|e| format!("Failed to parse signal data: {}", e))?
} else {
SignalData::empty()
};
Ok(Signal::new(id, action, signal_data).with_source(SignalSource::Source))
}
}
#[derive(Clone)]
pub struct SignalChannel {
sender: tokio::sync::mpsc::Sender<Signal>,
}
impl SignalChannel {
pub fn new(buffer_size: usize) -> (Self, tokio::sync::mpsc::Receiver<Signal>) {
let (sender, receiver) = tokio::sync::mpsc::channel(buffer_size);
(Self { sender }, receiver)
}
pub async fn send(&self, signal: Signal) -> Result<(), String> {
self.sender
.send(signal)
.await
.map_err(|e| format!("Failed to send signal: {}", e))
}
pub fn try_send(&self, signal: Signal) -> Result<(), String> {
self.sender
.try_send(signal)
.map_err(|e| format!("Failed to send signal: {}", e))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum SignalChannelType {
#[default]
Source,
Topic,
File,
Api,
Jmx,
}
impl SignalChannelType {
pub fn as_str(&self) -> &'static str {
match self {
SignalChannelType::Source => "source",
SignalChannelType::Topic => "kafka", SignalChannelType::File => "file",
SignalChannelType::Api => "api",
SignalChannelType::Jmx => "jmx",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"source" => Some(SignalChannelType::Source),
"kafka" | "topic" => Some(SignalChannelType::Topic),
"file" => Some(SignalChannelType::File),
"api" => Some(SignalChannelType::Api),
"jmx" => Some(SignalChannelType::Jmx),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SignalConfig {
#[serde(default = "default_enabled_channels")]
pub enabled_channels: Vec<SignalChannelType>,
#[serde(default)]
pub signal_data_collection: Option<String>,
#[serde(default)]
pub signal_topic: Option<String>,
#[serde(default)]
pub signal_file: Option<String>,
#[serde(default = "default_poll_interval_ms")]
pub signal_poll_interval_ms: u64,
#[serde(default)]
pub signal_consumer_properties: HashMap<String, String>,
}
fn default_enabled_channels() -> Vec<SignalChannelType> {
vec![SignalChannelType::Source, SignalChannelType::Topic]
}
fn default_poll_interval_ms() -> u64 {
1000 }
impl Default for SignalConfig {
fn default() -> Self {
Self {
enabled_channels: default_enabled_channels(),
signal_data_collection: None,
signal_topic: None,
signal_file: None,
signal_poll_interval_ms: default_poll_interval_ms(),
signal_consumer_properties: HashMap::new(),
}
}
}
impl SignalConfig {
pub fn builder() -> SignalConfigBuilder {
SignalConfigBuilder::default()
}
pub fn is_channel_enabled(&self, channel: SignalChannelType) -> bool {
self.enabled_channels.contains(&channel)
}
pub fn signal_table_name(&self) -> Option<&str> {
self.signal_data_collection
.as_ref()
.and_then(|s| s.split('.').next_back())
}
pub fn signal_schema_name(&self) -> Option<&str> {
self.signal_data_collection.as_ref().and_then(|s| {
let parts: Vec<&str> = s.split('.').collect();
if parts.len() >= 2 {
Some(parts[0])
} else {
None
}
})
}
pub fn parse_enabled_channels(s: &str) -> Vec<SignalChannelType> {
s.split(',')
.filter_map(|c| SignalChannelType::parse(c.trim()))
.collect()
}
}
#[derive(Debug, Default)]
pub struct SignalConfigBuilder {
enabled_channels: Option<Vec<SignalChannelType>>,
signal_data_collection: Option<String>,
signal_topic: Option<String>,
signal_file: Option<String>,
signal_poll_interval_ms: Option<u64>,
signal_consumer_properties: HashMap<String, String>,
}
impl SignalConfigBuilder {
pub fn enabled_channels(mut self, channels: Vec<SignalChannelType>) -> Self {
self.enabled_channels = Some(channels);
self
}
pub fn enable_channel(mut self, channel: SignalChannelType) -> Self {
self.enabled_channels
.get_or_insert_with(Vec::new)
.push(channel);
self
}
pub fn signal_data_collection(mut self, collection: impl Into<String>) -> Self {
self.signal_data_collection = Some(collection.into());
self
}
pub fn signal_topic(mut self, topic: impl Into<String>) -> Self {
self.signal_topic = Some(topic.into());
self
}
pub fn signal_file(mut self, path: impl Into<String>) -> Self {
self.signal_file = Some(path.into());
self
}
pub fn signal_poll_interval_ms(mut self, ms: u64) -> Self {
self.signal_poll_interval_ms = Some(ms);
self
}
pub fn consumer_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.signal_consumer_properties
.insert(key.into(), value.into());
self
}
pub fn build(self) -> SignalConfig {
SignalConfig {
enabled_channels: self
.enabled_channels
.unwrap_or_else(default_enabled_channels),
signal_data_collection: self.signal_data_collection,
signal_topic: self.signal_topic,
signal_file: self.signal_file,
signal_poll_interval_ms: self
.signal_poll_interval_ms
.unwrap_or_else(default_poll_interval_ms),
signal_consumer_properties: self.signal_consumer_properties,
}
}
}
#[derive(Debug, Clone)]
pub struct SignalRecord {
pub id: String,
pub signal_type: String,
pub data: Option<String>,
pub offset: Option<String>,
}
impl SignalRecord {
pub fn new(id: impl Into<String>, signal_type: impl Into<String>) -> Self {
Self {
id: id.into(),
signal_type: signal_type.into(),
data: None,
offset: None,
}
}
pub fn with_data(mut self, data: impl Into<String>) -> Self {
self.data = Some(data.into());
self
}
pub fn with_offset(mut self, offset: impl Into<String>) -> Self {
self.offset = Some(offset.into());
self
}
pub fn to_signal(&self, source: SignalSource) -> Result<Signal, String> {
let action = SignalAction::parse(&self.signal_type);
let signal_data = if let Some(data_str) = &self.data {
serde_json::from_str(data_str)
.map_err(|e| format!("Failed to parse signal data: {}", e))?
} else {
SignalData::empty()
};
Ok(Signal::new(&self.id, action, signal_data).with_source(source))
}
}
#[async_trait::async_trait]
pub trait SignalChannelReader: Send + Sync {
fn name(&self) -> &str;
async fn init(&mut self) -> Result<(), String>;
async fn read(&mut self) -> Result<Vec<SignalRecord>, String>;
async fn acknowledge(&mut self, _signal_id: &str) -> Result<(), String> {
Ok(()) }
async fn close(&mut self) -> Result<(), String>;
}
pub struct SourceSignalChannel {
signal_table: String,
pending: Arc<RwLock<Vec<SignalRecord>>>,
initialized: bool,
}
impl SourceSignalChannel {
pub fn new(signal_table: impl Into<String>) -> Self {
Self {
signal_table: signal_table.into(),
pending: Arc::new(RwLock::new(Vec::new())),
initialized: false,
}
}
pub fn pending_signals(&self) -> Arc<RwLock<Vec<SignalRecord>>> {
Arc::clone(&self.pending)
}
pub fn is_signal_event(&self, schema: &str, table: &str) -> bool {
let expected = format!("{}.{}", schema, table);
self.signal_table == expected || self.signal_table == table
}
pub async fn handle_cdc_event(
&self,
id: &str,
signal_type: &str,
data: Option<&str>,
) -> Result<(), String> {
let record = SignalRecord {
id: id.to_string(),
signal_type: signal_type.to_string(),
data: data.map(|s| s.to_string()),
offset: None,
};
self.pending.write().await.push(record);
debug!(
"Source channel: detected signal {} of type {}",
id, signal_type
);
Ok(())
}
}
#[async_trait::async_trait]
impl SignalChannelReader for SourceSignalChannel {
fn name(&self) -> &str {
"source"
}
async fn init(&mut self) -> Result<(), String> {
info!(
"Source signal channel initialized for table: {}",
self.signal_table
);
self.initialized = true;
Ok(())
}
async fn read(&mut self) -> Result<Vec<SignalRecord>, String> {
let mut pending = self.pending.write().await;
let signals = std::mem::take(&mut *pending);
if !signals.is_empty() {
debug!("Source channel: returning {} signals", signals.len());
}
Ok(signals)
}
async fn close(&mut self) -> Result<(), String> {
info!("Source signal channel closed");
self.initialized = false;
Ok(())
}
}
pub struct FileSignalChannel {
path: std::path::PathBuf,
processed: std::collections::HashSet<String>,
last_modified: Option<std::time::SystemTime>,
initialized: bool,
}
impl FileSignalChannel {
pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
Self {
path: path.into(),
processed: std::collections::HashSet::new(),
last_modified: None,
initialized: false,
}
}
}
#[async_trait::async_trait]
impl SignalChannelReader for FileSignalChannel {
fn name(&self) -> &str {
"file"
}
async fn init(&mut self) -> Result<(), String> {
if !self.path.exists() {
tokio::fs::write(&self.path, "")
.await
.map_err(|e| format!("Failed to create signal file: {}", e))?;
}
info!("File signal channel initialized: {:?}", self.path);
self.initialized = true;
Ok(())
}
async fn read(&mut self) -> Result<Vec<SignalRecord>, String> {
let metadata = tokio::fs::metadata(&self.path)
.await
.map_err(|e| format!("Failed to read signal file metadata: {}", e))?;
let modified = metadata
.modified()
.map_err(|e| format!("Failed to get file modification time: {}", e))?;
if self.last_modified == Some(modified) {
return Ok(Vec::new()); }
self.last_modified = Some(modified);
let content = tokio::fs::read_to_string(&self.path)
.await
.map_err(|e| format!("Failed to read signal file: {}", e))?;
let mut signals = Vec::new();
for line in content.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
#[derive(Deserialize)]
struct FileSignal {
id: String,
#[serde(rename = "type")]
signal_type: String,
data: Option<serde_json::Value>,
}
match serde_json::from_str::<FileSignal>(line) {
Ok(fs) => {
if !self.processed.contains(&fs.id) {
let record = SignalRecord {
id: fs.id.clone(),
signal_type: fs.signal_type,
data: fs.data.map(|v| v.to_string()),
offset: None,
};
signals.push(record);
self.processed.insert(fs.id);
}
}
Err(e) => {
warn!("Failed to parse signal line: {} - {}", line, e);
}
}
}
if !signals.is_empty() {
debug!("File channel: read {} new signals", signals.len());
}
Ok(signals)
}
async fn close(&mut self) -> Result<(), String> {
info!("File signal channel closed");
self.initialized = false;
Ok(())
}
}
pub struct SignalManager {
channels: Vec<Box<dyn SignalChannelReader>>,
processor: Arc<SignalProcessor>,
config: SignalConfig,
running: Arc<AtomicBool>,
}
impl SignalManager {
pub fn new(config: SignalConfig, processor: Arc<SignalProcessor>) -> Self {
Self {
channels: Vec::new(),
processor,
config,
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn add_channel(&mut self, channel: Box<dyn SignalChannelReader>) {
info!("Adding signal channel: {}", channel.name());
self.channels.push(channel);
}
pub async fn init(&mut self) -> Result<(), String> {
for channel in &mut self.channels {
channel.init().await?;
}
self.running.store(true, Ordering::SeqCst);
info!(
"Signal manager initialized with {} channels",
self.channels.len()
);
Ok(())
}
pub async fn poll(&mut self) -> Result<usize, String> {
let mut total = 0;
for channel in &mut self.channels {
let records = channel.read().await?;
for record in records {
let source = match channel.name() {
"source" => SignalSource::Source,
"file" => SignalSource::File,
"kafka" | "topic" => SignalSource::Kafka,
_ => SignalSource::Api,
};
match record.to_signal(source) {
Ok(signal) => {
let result = self.processor.process(signal).await;
if result.is_success() {
if let Err(e) = channel.acknowledge(&record.id).await {
warn!("Failed to acknowledge signal {}: {}", record.id, e);
}
}
total += 1;
}
Err(e) => {
warn!("Failed to parse signal {}: {}", record.id, e);
}
}
}
}
Ok(total)
}
pub async fn close(&mut self) -> Result<(), String> {
self.running.store(false, Ordering::SeqCst);
for channel in &mut self.channels {
if let Err(e) = channel.close().await {
warn!("Failed to close channel {}: {}", channel.name(), e);
}
}
info!("Signal manager closed");
Ok(())
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn config(&self) -> &SignalConfig {
&self.config
}
pub fn processor(&self) -> &Arc<SignalProcessor> {
&self.processor
}
pub fn create_source_channel(&self) -> Option<SourceSignalChannel> {
if self.config.is_channel_enabled(SignalChannelType::Source) {
self.config
.signal_data_collection
.as_ref()
.map(SourceSignalChannel::new)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_signal_action_str() {
assert_eq!(SignalAction::ExecuteSnapshot.as_str(), "execute-snapshot");
assert_eq!(SignalAction::StopSnapshot.as_str(), "stop-snapshot");
assert_eq!(SignalAction::PauseSnapshot.as_str(), "pause-snapshot");
assert_eq!(SignalAction::ResumeSnapshot.as_str(), "resume-snapshot");
assert_eq!(SignalAction::Log.as_str(), "log");
assert_eq!(
SignalAction::Custom("my-action".to_string()).as_str(),
"my-action"
);
}
#[test]
fn test_signal_action_parse() {
assert_eq!(
SignalAction::parse("execute-snapshot"),
SignalAction::ExecuteSnapshot
);
assert_eq!(
SignalAction::parse("pause-snapshot"),
SignalAction::PauseSnapshot
);
assert_eq!(
SignalAction::parse("unknown"),
SignalAction::Custom("unknown".to_string())
);
}
#[test]
fn test_signal_data_empty() {
let data = SignalData::empty();
assert!(data.data_collections.is_empty());
assert!(data.snapshot_type.is_none());
assert!(data.properties.is_empty());
}
#[test]
fn test_signal_data_for_snapshot() {
let data = SignalData::for_snapshot(
vec!["public.users".to_string(), "public.orders".to_string()],
"incremental",
);
assert_eq!(data.data_collections.len(), 2);
assert_eq!(data.snapshot_type, Some("incremental".to_string()));
}
#[test]
fn test_signal_data_for_log() {
let data = SignalData::for_log("Test message");
assert_eq!(data.log_message(), Some("Test message"));
}
#[test]
fn test_signal_data_properties() {
let data = SignalData::empty()
.with_property("key1", serde_json::json!("value1"))
.with_property("key2", serde_json::json!(42));
assert_eq!(
data.get_property("key1"),
Some(&serde_json::json!("value1"))
);
assert_eq!(data.get_property("key2"), Some(&serde_json::json!(42)));
assert_eq!(data.get_property("key3"), None);
}
#[test]
fn test_signal_execute_snapshot() {
let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
assert_eq!(signal.data.data_collections, vec!["public.users"]);
assert_eq!(signal.data.snapshot_type, Some("incremental".to_string()));
assert!(signal.is_snapshot_action());
assert!(!signal.is_control_action());
}
#[test]
fn test_signal_blocking_snapshot() {
let signal = Signal::blocking_snapshot(vec!["public.orders".to_string()]);
assert_eq!(signal.data.snapshot_type, Some("blocking".to_string()));
}
#[test]
fn test_signal_stop_snapshot() {
let signal = Signal::stop_snapshot();
assert_eq!(signal.action, SignalAction::StopSnapshot);
assert!(signal.is_snapshot_action());
}
#[test]
fn test_signal_pause() {
let signal = Signal::pause();
assert_eq!(signal.action, SignalAction::PauseSnapshot);
assert!(signal.is_control_action());
assert!(!signal.is_snapshot_action());
}
#[test]
fn test_signal_resume() {
let signal = Signal::resume();
assert_eq!(signal.action, SignalAction::ResumeSnapshot);
assert!(signal.is_control_action());
}
#[test]
fn test_signal_log() {
let signal = Signal::log("Hello, CDC!");
assert_eq!(signal.action, SignalAction::Log);
assert_eq!(signal.data.log_message(), Some("Hello, CDC!"));
}
#[test]
fn test_signal_custom() {
let data =
SignalData::empty().with_property("custom_field", serde_json::json!("custom_value"));
let signal = Signal::custom("my-custom-action", data);
assert_eq!(
signal.action,
SignalAction::Custom("my-custom-action".to_string())
);
}
#[test]
fn test_signal_with_source() {
let signal = Signal::pause().with_source(SignalSource::Kafka);
assert_eq!(signal.source, SignalSource::Kafka);
}
#[test]
fn test_signal_result() {
assert!(SignalResult::Success.is_success());
assert!(SignalResult::Pending("waiting".to_string()).is_success());
assert!(!SignalResult::Failed("error".to_string()).is_success());
assert!(!SignalResult::Ignored("skipped".to_string()).is_success());
assert_eq!(
SignalResult::Failed("error msg".to_string()).error_message(),
Some("error msg")
);
assert_eq!(SignalResult::Success.error_message(), None);
}
#[test]
fn test_signal_stats() {
let stats = SignalStats::default();
stats.record_received();
stats.record_received();
assert_eq!(stats.received(), 2);
stats.record_processed();
assert_eq!(stats.processed(), 1);
stats.record_failed();
assert_eq!(stats.failed(), 1);
stats.record_snapshot();
stats.record_control();
}
#[tokio::test]
async fn test_signal_processor_new() {
let processor = SignalProcessor::new();
assert!(!processor.is_paused());
assert_eq!(processor.stats().received(), 0);
}
#[tokio::test]
async fn test_signal_processor_pause_resume() {
let processor = SignalProcessor::new();
assert!(!processor.is_paused());
processor.pause();
assert!(processor.is_paused());
processor.resume();
assert!(!processor.is_paused());
}
#[tokio::test]
async fn test_signal_processor_process_pause() {
let processor = SignalProcessor::new();
let result = processor.process(Signal::pause()).await;
assert!(result.is_success());
assert!(processor.is_paused());
}
#[tokio::test]
async fn test_signal_processor_process_resume() {
let processor = SignalProcessor::new();
processor.pause();
let result = processor.process(Signal::resume()).await;
assert!(result.is_success());
assert!(!processor.is_paused());
}
#[tokio::test]
async fn test_signal_processor_process_log() {
let processor = SignalProcessor::new();
let result = processor.process(Signal::log("Test log")).await;
assert!(result.is_success());
}
#[tokio::test]
async fn test_signal_processor_custom_handler() {
let processor = SignalProcessor::new();
processor
.register_handler("custom-action", |_signal| async { SignalResult::Success })
.await;
let signal = Signal::custom("custom-action", SignalData::empty());
let result = processor.process(signal).await;
assert!(result.is_success());
}
#[tokio::test]
async fn test_signal_processor_source_filtering() {
let processor = SignalProcessor::new();
processor.set_enabled_sources(vec![SignalSource::Api]).await;
let api_signal = Signal::pause().with_source(SignalSource::Api);
let result = processor.process(api_signal).await;
assert!(result.is_success());
let kafka_signal = Signal::pause().with_source(SignalSource::Kafka);
let result = processor.process(kafka_signal).await;
assert!(matches!(result, SignalResult::Ignored(_)));
}
#[tokio::test]
async fn test_signal_processor_snapshot_pending() {
let processor = SignalProcessor::new();
let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
let result = processor.process(signal).await;
assert!(matches!(result, SignalResult::Pending(_)));
}
#[tokio::test]
async fn test_signal_processor_stats() {
let processor = SignalProcessor::new();
processor.process(Signal::log("msg1")).await;
processor.process(Signal::pause()).await;
processor.process(Signal::resume()).await;
assert_eq!(processor.stats().received(), 3);
assert_eq!(processor.stats().processed(), 3);
}
#[test]
fn test_parse_from_row() {
let signal = SignalProcessor::parse_from_row(
"sig-1",
"execute-snapshot",
Some(r#"{"data-collections": ["public.users"]}"#),
)
.unwrap();
assert_eq!(signal.id, "sig-1");
assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
assert_eq!(signal.source, SignalSource::Source);
}
#[test]
fn test_parse_from_row_no_data() {
let signal = SignalProcessor::parse_from_row("sig-2", "pause-snapshot", None).unwrap();
assert_eq!(signal.id, "sig-2");
assert_eq!(signal.action, SignalAction::PauseSnapshot);
}
#[test]
fn test_parse_from_row_invalid_json() {
let result = SignalProcessor::parse_from_row("sig-3", "log", Some("not valid json"));
assert!(result.is_err());
}
#[tokio::test]
async fn test_signal_channel() {
let (channel, mut receiver) = SignalChannel::new(16);
channel.send(Signal::pause()).await.unwrap();
channel.send(Signal::resume()).await.unwrap();
let sig1 = receiver.recv().await.unwrap();
let sig2 = receiver.recv().await.unwrap();
assert_eq!(sig1.action, SignalAction::PauseSnapshot);
assert_eq!(sig2.action, SignalAction::ResumeSnapshot);
}
#[tokio::test]
async fn test_signal_channel_try_send() {
let (channel, _receiver) = SignalChannel::new(2);
assert!(channel.try_send(Signal::pause()).is_ok());
assert!(channel.try_send(Signal::resume()).is_ok());
assert!(channel.try_send(Signal::log("overflow")).is_err());
}
#[test]
fn test_signal_serialization() {
let signal = Signal::execute_snapshot(vec!["public.users".to_string()]);
let json = serde_json::to_string(&signal).unwrap();
assert!(json.contains("execute-snapshot"));
assert!(json.contains("public.users"));
let parsed: Signal = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.action, SignalAction::ExecuteSnapshot);
}
#[test]
fn test_signal_channel_type_str() {
assert_eq!(SignalChannelType::Source.as_str(), "source");
assert_eq!(SignalChannelType::Topic.as_str(), "kafka");
assert_eq!(SignalChannelType::File.as_str(), "file");
assert_eq!(SignalChannelType::Api.as_str(), "api");
assert_eq!(SignalChannelType::Jmx.as_str(), "jmx");
}
#[test]
fn test_signal_channel_type_parse() {
assert_eq!(
SignalChannelType::parse("source"),
Some(SignalChannelType::Source)
);
assert_eq!(
SignalChannelType::parse("kafka"),
Some(SignalChannelType::Topic)
);
assert_eq!(
SignalChannelType::parse("topic"),
Some(SignalChannelType::Topic)
);
assert_eq!(
SignalChannelType::parse("file"),
Some(SignalChannelType::File)
);
assert_eq!(SignalChannelType::parse("unknown"), None);
}
#[test]
fn test_signal_config_default() {
let config = SignalConfig::default();
assert!(config.is_channel_enabled(SignalChannelType::Source));
assert!(config.is_channel_enabled(SignalChannelType::Topic));
assert!(!config.is_channel_enabled(SignalChannelType::File));
assert!(config.signal_data_collection.is_none());
assert!(config.signal_topic.is_none());
}
#[test]
fn test_signal_config_builder() {
let config = SignalConfig::builder()
.enabled_channels(vec![SignalChannelType::Source, SignalChannelType::File])
.signal_data_collection("public.debezium_signal")
.signal_file("/tmp/signals.json")
.signal_poll_interval_ms(500)
.consumer_property("bootstrap.servers", "localhost:9092")
.build();
assert!(config.is_channel_enabled(SignalChannelType::Source));
assert!(config.is_channel_enabled(SignalChannelType::File));
assert!(!config.is_channel_enabled(SignalChannelType::Topic));
assert_eq!(
config.signal_data_collection,
Some("public.debezium_signal".to_string())
);
assert_eq!(config.signal_file, Some("/tmp/signals.json".to_string()));
assert_eq!(config.signal_poll_interval_ms, 500);
}
#[test]
fn test_signal_config_table_name() {
let config = SignalConfig::builder()
.signal_data_collection("public.debezium_signal")
.build();
assert_eq!(config.signal_table_name(), Some("debezium_signal"));
assert_eq!(config.signal_schema_name(), Some("public"));
}
#[test]
fn test_signal_config_parse_channels() {
let channels = SignalConfig::parse_enabled_channels("source, kafka, file");
assert_eq!(channels.len(), 3);
assert!(channels.contains(&SignalChannelType::Source));
assert!(channels.contains(&SignalChannelType::Topic));
assert!(channels.contains(&SignalChannelType::File));
}
#[test]
fn test_signal_record_to_signal() {
let record = SignalRecord::new("sig-1", "execute-snapshot")
.with_data(r#"{"data-collections": ["public.users"]}"#);
let signal = record.to_signal(SignalSource::Source).unwrap();
assert_eq!(signal.id, "sig-1");
assert_eq!(signal.action, SignalAction::ExecuteSnapshot);
assert_eq!(signal.source, SignalSource::Source);
assert_eq!(signal.data.data_collections, vec!["public.users"]);
}
#[test]
fn test_signal_record_to_signal_no_data() {
let record = SignalRecord::new("sig-2", "pause-snapshot");
let signal = record.to_signal(SignalSource::File).unwrap();
assert_eq!(signal.id, "sig-2");
assert_eq!(signal.action, SignalAction::PauseSnapshot);
assert_eq!(signal.source, SignalSource::File);
}
#[test]
fn test_signal_record_invalid_json() {
let record = SignalRecord::new("sig-3", "log").with_data("not valid json");
assert!(record.to_signal(SignalSource::Api).is_err());
}
#[tokio::test]
async fn test_source_signal_channel() {
let mut channel = SourceSignalChannel::new("public.debezium_signal");
channel.init().await.unwrap();
assert_eq!(channel.name(), "source");
let signals = channel.read().await.unwrap();
assert!(signals.is_empty());
channel
.handle_cdc_event(
"sig-1",
"execute-snapshot",
Some(r#"{"data-collections": ["public.orders"]}"#),
)
.await
.unwrap();
let signals = channel.read().await.unwrap();
assert_eq!(signals.len(), 1);
assert_eq!(signals[0].id, "sig-1");
assert_eq!(signals[0].signal_type, "execute-snapshot");
let signals = channel.read().await.unwrap();
assert!(signals.is_empty());
channel.close().await.unwrap();
}
#[test]
fn test_source_signal_channel_is_signal_event() {
let channel = SourceSignalChannel::new("public.debezium_signal");
assert!(channel.is_signal_event("public", "debezium_signal"));
assert!(!channel.is_signal_event("public", "users"));
assert!(!channel.is_signal_event("other", "debezium_signal"));
}
#[tokio::test]
async fn test_file_signal_channel() {
let temp_dir = std::env::temp_dir();
let signal_file = temp_dir.join(format!("rivven_signals_{}.json", uuid::Uuid::new_v4()));
let content = r#"{"id":"sig-1","type":"execute-snapshot","data":{"data-collections":["public.users"]}}
{"id":"sig-2","type":"pause-snapshot"}
# This is a comment
{"id":"sig-3","type":"log","data":{"message":"Hello"}}"#;
tokio::fs::write(&signal_file, content).await.unwrap();
let mut channel = FileSignalChannel::new(&signal_file);
channel.init().await.unwrap();
assert_eq!(channel.name(), "file");
let signals = channel.read().await.unwrap();
assert_eq!(signals.len(), 3);
assert_eq!(signals[0].id, "sig-1");
assert_eq!(signals[1].id, "sig-2");
assert_eq!(signals[2].id, "sig-3");
let signals = channel.read().await.unwrap();
assert!(signals.is_empty());
channel.close().await.unwrap();
let _ = tokio::fs::remove_file(&signal_file).await;
}
#[tokio::test]
async fn test_signal_manager() {
let config = SignalConfig::builder()
.enabled_channels(vec![SignalChannelType::Source])
.signal_data_collection("public.debezium_signal")
.build();
let processor = Arc::new(SignalProcessor::new());
let mut manager = SignalManager::new(config, processor.clone());
let source_channel = manager.create_source_channel().unwrap();
let pending = source_channel.pending_signals();
manager.add_channel(Box::new(source_channel));
manager.init().await.unwrap();
assert!(manager.is_running());
pending
.write()
.await
.push(SignalRecord::new("sig-1", "pause-snapshot"));
let count = manager.poll().await.unwrap();
assert_eq!(count, 1);
assert!(processor.is_paused());
manager.close().await.unwrap();
assert!(!manager.is_running());
}
}