pub mod functions;
pub mod profile;
pub mod utils;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::connector::ConnectorRegistry;
use crate::storage::models::{Channel, Workflow};
use crate::storage::repositories::workflows::{
workflow_to_dataflow, workflow_to_dataflow_with_rollout,
};
pub async fn acquire_engine_read(
lock: &RwLock<Arc<dataflow_rs::Engine>>,
) -> Arc<dataflow_rs::Engine> {
let start = std::time::Instant::now();
let guard = lock.read().await;
let elapsed = start.elapsed();
crate::metrics::record_engine_lock_wait("read", elapsed.as_secs_f64());
profile::record_engine_lock_wait(elapsed);
guard.clone()
}
pub async fn acquire_engine_write(
lock: &RwLock<Arc<dataflow_rs::Engine>>,
) -> tokio::sync::RwLockWriteGuard<'_, Arc<dataflow_rs::Engine>> {
let start = std::time::Instant::now();
let guard = lock.write().await;
crate::metrics::record_engine_lock_wait("write", start.elapsed().as_secs_f64());
guard
}
pub const KNOWN_FUNCTIONS: &[&str] = &[
"map",
"validation",
"validate",
"parse_json",
"parse_xml",
"publish_json",
"publish_xml",
"filter",
"log",
"http_call",
"publish_kafka",
"db_read",
"db_write",
"cache_read",
"cache_write",
"mongo_read",
"channel_call",
];
pub const CONNECTOR_FUNCTIONS: &[&str] = &[
"http_call",
"publish_kafka",
"db_read",
"db_write",
"cache_read",
"cache_write",
"mongo_read",
];
pub fn build_custom_functions(
registry: Arc<ConnectorRegistry>,
client: reqwest::Client,
engine: Arc<tokio::sync::RwLock<Arc<dataflow_rs::Engine>>>,
engine_config: &crate::config::EngineConfig,
cache_pool: Arc<crate::connector::cache_backend::CachePool>,
sql_pool_cache: Arc<crate::connector::pool_cache::SqlPoolCache>,
mongo_pool_cache: Arc<crate::connector::mongo_pool::MongoPoolCache>,
) -> HashMap<String, dataflow_rs::BoxedFunctionHandler> {
let mut fns: HashMap<String, dataflow_rs::BoxedFunctionHandler> = HashMap::new();
fns.insert(
"http_call".to_string(),
Box::new(functions::http_call::HttpCallHandler {
registry: registry.clone(),
client: client.clone(),
}),
);
fns.insert(
"channel_call".to_string(),
Box::new(functions::channel_call::ChannelCallHandler {
engine,
max_call_depth: engine_config.max_channel_call_depth,
default_timeout_ms: engine_config.default_channel_call_timeout_ms,
}),
);
fns.insert(
"publish_kafka".to_string(),
Box::new(functions::publish_kafka::PublishKafkaHandler {
registry: registry.clone(),
producer: None,
}),
);
fns.insert(
"db_read".to_string(),
Box::new(functions::db_read::DbReadHandler {
pool_cache: sql_pool_cache.clone(),
registry: registry.clone(),
}),
);
fns.insert(
"db_write".to_string(),
Box::new(functions::db_write::DbWriteHandler {
pool_cache: sql_pool_cache,
registry: registry.clone(),
}),
);
fns.insert(
"cache_read".to_string(),
Box::new(functions::cache_read::CacheReadHandler {
cache_pool: cache_pool.clone(),
registry: registry.clone(),
}),
);
fns.insert(
"cache_write".to_string(),
Box::new(functions::cache_write::CacheWriteHandler {
cache_pool,
registry: registry.clone(),
}),
);
fns.insert(
"mongo_read".to_string(),
Box::new(functions::mongo_read::MongoReadHandler {
pool_cache: mongo_pool_cache,
registry: registry.clone(),
}),
);
fns
}
pub fn register_kafka_publisher(
fns: &mut HashMap<String, dataflow_rs::BoxedFunctionHandler>,
registry: Arc<ConnectorRegistry>,
producer: Arc<crate::kafka::producer::KafkaProducer>,
) {
fns.insert(
"publish_kafka".to_string(),
Box::new(functions::publish_kafka::PublishKafkaHandler {
registry,
producer: Some(producer),
}),
);
}
pub fn filter_channels(
channels: Vec<Channel>,
config: &crate::config::ChannelLoadingConfig,
) -> Vec<Channel> {
if config.include.is_empty() && config.exclude.is_empty() {
return channels;
}
channels
.into_iter()
.filter(|ch| {
if !config.include.is_empty() && !config.include.iter().any(|p| glob_match(p, &ch.name))
{
return false;
}
!config.exclude.iter().any(|p| glob_match(p, &ch.name))
})
.collect()
}
fn glob_match(pattern: &str, name: &str) -> bool {
let parts: Vec<&str> = pattern.split('*').collect();
if parts.len() == 1 {
return pattern == name;
}
let mut pos = 0;
for (i, part) in parts.iter().enumerate() {
if part.is_empty() {
continue;
}
if let Some(found) = name[pos..].find(part) {
if i == 0 && found != 0 {
return false;
}
pos += found + part.len();
} else {
return false;
}
}
if pattern.ends_with('*') {
true
} else {
pos == name.len()
}
}
pub fn build_engine_workflows(
channels: &[Channel],
workflows: &[Workflow],
) -> Vec<dataflow_rs::Workflow> {
let mut workflow_map: HashMap<String, Vec<&Workflow>> = HashMap::new();
for workflow in workflows {
workflow_map
.entry(workflow.workflow_id.clone())
.or_default()
.push(workflow);
}
let mut result = Vec::new();
for channel in channels {
let Some(ref wf_id) = channel.workflow_id else {
tracing::warn!(
channel_id = %channel.channel_id,
channel_name = %channel.name,
"Channel has no workflow_id, skipping"
);
continue;
};
let Some(wf_versions) = workflow_map.get(wf_id) else {
tracing::warn!(
channel_id = %channel.channel_id,
workflow_id = %wf_id,
"Workflow not found for channel, skipping"
);
continue;
};
if wf_versions.len() == 1 && wf_versions[0].rollout_percentage == 100 {
match workflow_to_dataflow(wf_versions[0], &channel.name) {
Ok(w) => result.push(w),
Err(e) => {
tracing::warn!(
workflow_id = %wf_id,
channel = %channel.name,
error = %e,
"Failed to convert workflow to dataflow, skipping"
);
}
}
} else {
let mut sorted: Vec<&&Workflow> = wf_versions.iter().collect();
sorted.sort_by_key(|b| std::cmp::Reverse(b.version));
let mut bucket_offset = 0i64;
for wf in &sorted {
let bucket_min = bucket_offset;
let bucket_max = bucket_offset + wf.rollout_percentage;
match workflow_to_dataflow_with_rollout(wf, &channel.name, bucket_min, bucket_max) {
Ok(w) => result.push(w),
Err(e) => {
tracing::warn!(
workflow_id = %wf.workflow_id,
version = wf.version,
channel = %channel.name,
error = %e,
"Failed to convert workflow version to dataflow, skipping"
);
}
}
bucket_offset = bucket_max;
}
}
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_glob_match_exact() {
assert!(glob_match("orders", "orders"));
assert!(!glob_match("orders", "events"));
}
#[test]
fn test_glob_match_prefix_wildcard() {
assert!(glob_match("internal-*", "internal-debug"));
assert!(glob_match("internal-*", "internal-"));
assert!(!glob_match("internal-*", "external-debug"));
}
#[test]
fn test_glob_match_suffix_wildcard() {
assert!(glob_match("*-debug", "internal-debug"));
assert!(!glob_match("*-debug", "internal-prod"));
}
#[test]
fn test_glob_match_star_only() {
assert!(glob_match("*", "anything"));
assert!(glob_match("*", ""));
}
#[test]
fn test_glob_match_middle_wildcard() {
assert!(glob_match("pre*suf", "presuf"));
assert!(glob_match("pre*suf", "pre-middle-suf"));
assert!(!glob_match("pre*suf", "pre-middle"));
}
fn make_channel(name: &str) -> Channel {
Channel {
channel_id: name.to_string(),
name: name.to_string(),
version: 1,
status: crate::storage::models::EntityStatus::Active
.as_str()
.to_string(),
channel_type: "sync".to_string(),
protocol: crate::storage::models::ChannelProtocol::Http
.as_str()
.to_string(),
methods: Some("POST".to_string()),
workflow_id: None,
topic: None,
consumer_group: None,
route_pattern: None,
description: None,
transport_config_json: "{}".to_string(),
config_json: "{}".to_string(),
priority: 0,
created_at: chrono::NaiveDateTime::default(),
updated_at: chrono::NaiveDateTime::default(),
}
}
#[test]
fn test_filter_channels_no_config() {
let channels = vec![make_channel("orders"), make_channel("events")];
let config = crate::config::ChannelLoadingConfig::default();
let filtered = filter_channels(channels, &config);
assert_eq!(filtered.len(), 2);
}
#[test]
fn test_filter_channels_include_only() {
let channels = vec![
make_channel("orders"),
make_channel("events"),
make_channel("internal-debug"),
];
let config = crate::config::ChannelLoadingConfig {
include: vec!["orders".to_string(), "events".to_string()],
exclude: vec![],
};
let filtered = filter_channels(channels, &config);
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().all(|c| c.name != "internal-debug"));
}
#[test]
fn test_filter_channels_exclude_only() {
let channels = vec![
make_channel("orders"),
make_channel("events"),
make_channel("internal-debug"),
];
let config = crate::config::ChannelLoadingConfig {
include: vec![],
exclude: vec!["internal-*".to_string()],
};
let filtered = filter_channels(channels, &config);
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().all(|c| c.name != "internal-debug"));
}
#[test]
fn test_filter_channels_include_and_exclude() {
let channels = vec![
make_channel("orders"),
make_channel("orders-debug"),
make_channel("events"),
];
let config = crate::config::ChannelLoadingConfig {
include: vec!["orders*".to_string()],
exclude: vec!["*-debug".to_string()],
};
let filtered = filter_channels(channels, &config);
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].name, "orders");
}
}