use crate::error::Result;
use crate::types::{RelPtr, TraceId};
use std::future::Future;
use std::pin::Pin;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TriggerType {
Webhook,
Kafka,
Cron,
Queue,
Filesystem,
Manual,
Memory,
}
impl TriggerType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Webhook => "trigger::webhook",
Self::Kafka => "trigger::kafka",
Self::Cron => "trigger::cron",
Self::Queue => "trigger::queue",
Self::Filesystem => "trigger::filesystem",
Self::Manual => "trigger::manual",
Self::Memory => "trigger::memory",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"trigger::webhook" | "webhook" => Some(Self::Webhook),
"trigger::kafka" | "kafka" => Some(Self::Kafka),
"trigger::cron" | "cron" => Some(Self::Cron),
"trigger::queue" | "queue" => Some(Self::Queue),
"trigger::filesystem" | "filesystem" => Some(Self::Filesystem),
"trigger::manual" | "manual" => Some(Self::Manual),
"trigger::memory" | "memory" => Some(Self::Memory),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct TriggerConfig {
pub id: String,
pub trigger_type: TriggerType,
pub params: serde_yaml::Value,
}
impl TriggerConfig {
pub fn new(id: impl Into<String>, trigger_type: TriggerType) -> Self {
Self {
id: id.into(),
trigger_type,
params: serde_yaml::Value::Null,
}
}
pub fn with_params(mut self, params: serde_yaml::Value) -> Self {
self.params = params;
self
}
pub fn get_string(&self, key: &str) -> Option<&str> {
self.params.get(key).and_then(|v| v.as_str())
}
pub fn get_i64(&self, key: &str) -> Option<i64> {
self.params.get(key).and_then(|v| v.as_i64())
}
pub fn get_bool(&self, key: &str) -> Option<bool> {
self.params.get(key).and_then(|v| v.as_bool())
}
}
#[derive(Debug, Clone)]
pub struct TriggerEvent {
pub trigger_id: String,
pub trace_id: TraceId,
pub data: RelPtr<()>,
pub schema_hash: u64,
pub timestamp_ns: u64,
pub metadata: Option<String>,
}
impl TriggerEvent {
pub fn new(trigger_id: impl Into<String>, data: RelPtr<()>) -> Self {
Self {
trigger_id: trigger_id.into(),
trace_id: TraceId::new(),
data,
schema_hash: 0,
timestamp_ns: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0),
metadata: None,
}
}
pub fn with_schema_hash(mut self, hash: u64) -> Self {
self.schema_hash = hash;
self
}
pub fn with_metadata(mut self, metadata: impl Into<String>) -> Self {
self.metadata = Some(metadata.into());
self
}
}
pub type TriggerFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>>;
pub trait Trigger: Send + Sync {
fn trigger_type(&self) -> TriggerType;
fn id(&self) -> &str;
fn start<'a>(
&'a self,
callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
) -> TriggerFuture<'a, ()>;
fn stop<'a>(&'a self) -> TriggerFuture<'a, ()>;
fn pause<'a>(&'a self) -> TriggerFuture<'a, ()>;
fn resume<'a>(&'a self) -> TriggerFuture<'a, ()>;
fn is_running(&self) -> bool;
}
pub trait TriggerFactory: Send + Sync {
fn trigger_type(&self) -> TriggerType;
fn create(&self, config: &TriggerConfig) -> Result<Box<dyn Trigger>>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn trigger_type_parsing() {
assert_eq!(
TriggerType::from_str("trigger::webhook"),
Some(TriggerType::Webhook)
);
assert_eq!(TriggerType::from_str("kafka"), Some(TriggerType::Kafka));
assert_eq!(TriggerType::from_str("unknown"), None);
}
#[test]
fn trigger_config_params() {
let mut params = serde_yaml::Mapping::new();
params.insert(
serde_yaml::Value::String("method".to_string()),
serde_yaml::Value::String("POST".to_string()),
);
params.insert(
serde_yaml::Value::String("port".to_string()),
serde_yaml::Value::Number(8080.into()),
);
let config = TriggerConfig::new("my_webhook", TriggerType::Webhook)
.with_params(serde_yaml::Value::Mapping(params));
assert_eq!(config.get_string("method"), Some("POST"));
assert_eq!(config.get_i64("port"), Some(8080));
}
}