use std::sync::Arc;
use dataflow_rs::engine::error::DataflowError;
use dataflow_rs::engine::functions::config::FunctionConfig;
use dataflow_rs::engine::message::{Change, Message};
use serde_json::Value;
use crate::connector::{
CacheConnectorConfig, ConnectorConfig, ConnectorRegistry, DbConnectorConfig,
};
pub fn extract_output_path(input: &Value) -> &str {
input
.get("output")
.and_then(|v| v.as_str())
.unwrap_or("data")
}
pub fn to_exec_error(e: impl std::fmt::Display) -> DataflowError {
DataflowError::function_execution(e.to_string(), None)
}
pub fn extract_custom_input<'a>(
config: &'a FunctionConfig,
handler_name: &str,
) -> Result<&'a Value, DataflowError> {
match config {
FunctionConfig::Custom { input, .. } => Ok(input),
_ => Err(DataflowError::Validation(format!(
"Expected Custom config for {handler_name}"
))),
}
}
pub fn require_str_field<'a>(
input: &'a Value,
field: &str,
handler_name: &str,
) -> Result<&'a str, DataflowError> {
input.get(field).and_then(|v| v.as_str()).ok_or_else(|| {
DataflowError::Validation(format!("{handler_name} requires '{field}' field"))
})
}
pub async fn resolve_connector(
registry: &ConnectorRegistry,
name: &str,
) -> Result<Arc<ConnectorConfig>, DataflowError> {
registry.get(name).await.ok_or_else(|| {
DataflowError::function_execution(format!("Connector '{name}' not found"), None)
})
}
pub fn require_db_connector<'a>(
config: &'a ConnectorConfig,
name: &str,
) -> Result<&'a DbConnectorConfig, DataflowError> {
match config {
ConnectorConfig::Db(c) => Ok(c),
_ => Err(DataflowError::Validation(format!(
"Connector '{name}' is not a database connector"
))),
}
}
pub fn require_cache_connector<'a>(
config: &'a ConnectorConfig,
name: &str,
) -> Result<&'a CacheConnectorConfig, DataflowError> {
match config {
ConnectorConfig::Cache(c) => Ok(c),
_ => Err(DataflowError::Validation(format!(
"Connector '{name}' is not a cache connector"
))),
}
}
pub fn apply_output(message: &mut Message, output_path: &str, new_value: Value) -> Vec<Change> {
let old_value = super::http_common::get_nested(&message.context, output_path);
super::http_common::set_nested(&mut message.context, output_path, new_value.clone());
message.invalidate_context_cache();
vec![Change {
path: Arc::from(output_path),
old_value: Arc::new(old_value),
new_value: Arc::new(new_value),
}]
}
pub fn bind_json_params<'q>(
mut query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>,
params: &'q [Value],
) -> sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>> {
for param in params {
query = match param {
Value::String(s) => query.bind(s.as_str()),
Value::Number(n) => {
if let Some(i) = n.as_i64() {
query.bind(i)
} else if let Some(f) = n.as_f64() {
query.bind(f)
} else {
query.bind(n.to_string())
}
}
Value::Bool(b) => query.bind(*b),
Value::Null => query.bind(None::<String>),
_ => query.bind(param.to_string()),
};
}
query
}
pub async fn timed_query<F, T, E>(
timeout_ms: Option<u64>,
handler_name: &str,
operation: F,
) -> Result<T, DataflowError>
where
F: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let ms = timeout_ms.unwrap_or(30_000);
tokio::time::timeout(std::time::Duration::from_millis(ms), operation)
.await
.map_err(|_| {
DataflowError::Timeout(format!("{handler_name} query timed out after {ms}ms"))
})?
.map_err(|e| {
DataflowError::function_execution(format!("{handler_name} query failed: {e}"), None)
})
}