use serde::{Deserialize, Serialize};
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum TriggerError {
#[error("Trigger poll error: {message}")]
PollError { message: String },
#[error("Context creation error: {0}")]
ContextError(#[from] crate::error::ContextError),
#[error("Trigger not found: {name}")]
TriggerNotFound { name: String },
#[error("Database error: {0}")]
Database(#[from] diesel::result::Error),
#[error("Connection pool error: {0}")]
ConnectionPool(String),
#[error("Failed to schedule workflow '{workflow}': {message}")]
WorkflowSchedulingFailed { workflow: String, message: String },
}
impl From<deadpool::managed::PoolError<deadpool_diesel::Error>> for TriggerError {
fn from(err: deadpool::managed::PoolError<deadpool_diesel::Error>) -> Self {
TriggerError::ConnectionPool(err.to_string())
}
}
impl From<cloacina_workflow::TriggerError> for TriggerError {
fn from(err: cloacina_workflow::TriggerError) -> Self {
match err {
cloacina_workflow::TriggerError::PollError { message } => {
TriggerError::PollError { message }
}
cloacina_workflow::TriggerError::ContextError(e) => TriggerError::PollError {
message: format!("Context error: {}", e),
},
}
}
}
pub use cloacina_workflow::TriggerResult;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerConfig {
pub name: String,
pub workflow_name: String,
pub poll_interval: Duration,
pub allow_concurrent: bool,
pub enabled: bool,
}
impl TriggerConfig {
pub fn new(name: &str, workflow_name: &str, poll_interval: Duration) -> Self {
Self {
name: name.to_string(),
workflow_name: workflow_name.to_string(),
poll_interval,
allow_concurrent: false,
enabled: true,
}
}
pub fn with_allow_concurrent(mut self, allow: bool) -> Self {
self.allow_concurrent = allow;
self
}
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
}
pub use cloacina_workflow::Trigger;
#[cfg(test)]
mod tests {
use super::*;
use crate::Context;
use async_trait::async_trait;
#[derive(Debug)]
struct TestTrigger {
name: String,
should_fire: bool,
}
#[async_trait]
impl Trigger for TestTrigger {
fn name(&self) -> &str {
&self.name
}
fn poll_interval(&self) -> Duration {
Duration::from_secs(1)
}
fn allow_concurrent(&self) -> bool {
false
}
async fn poll(&self) -> Result<TriggerResult, cloacina_workflow::TriggerError> {
if self.should_fire {
Ok(TriggerResult::Fire(None))
} else {
Ok(TriggerResult::Skip)
}
}
}
#[test]
fn test_trigger_result_should_fire() {
assert!(!TriggerResult::Skip.should_fire());
assert!(TriggerResult::Fire(None).should_fire());
assert!(TriggerResult::Fire(Some(Context::new())).should_fire());
}
#[test]
fn test_trigger_result_into_context() {
assert!(TriggerResult::Skip.into_context().is_none());
assert!(TriggerResult::Fire(None).into_context().is_none());
let ctx = Context::new();
let result = TriggerResult::Fire(Some(ctx));
assert!(result.into_context().is_some());
}
#[test]
fn test_trigger_result_context_hash() {
assert_eq!(TriggerResult::Skip.context_hash(), "skip");
assert_eq!(TriggerResult::Fire(None).context_hash(), "fire_no_context");
let mut ctx1 = Context::new();
ctx1.insert("key", serde_json::json!("value1")).unwrap();
let hash1 = TriggerResult::Fire(Some(ctx1)).context_hash();
let mut ctx2 = Context::new();
ctx2.insert("key", serde_json::json!("value2")).unwrap();
let hash2 = TriggerResult::Fire(Some(ctx2)).context_hash();
assert_ne!(hash1, hash2);
let mut ctx3 = Context::new();
ctx3.insert("key", serde_json::json!("value1")).unwrap();
let hash3 = TriggerResult::Fire(Some(ctx3)).context_hash();
assert_eq!(hash1, hash3);
}
#[test]
fn test_trigger_config() {
let config = TriggerConfig::new("test", "my_workflow", Duration::from_secs(5));
assert_eq!(config.name, "test");
assert_eq!(config.workflow_name, "my_workflow");
assert_eq!(config.poll_interval, Duration::from_secs(5));
assert!(!config.allow_concurrent);
assert!(config.enabled);
let config = config.with_allow_concurrent(true).with_enabled(false);
assert!(config.allow_concurrent);
assert!(!config.enabled);
}
#[tokio::test]
async fn test_trigger_trait() {
let trigger = TestTrigger {
name: "test_trigger".to_string(),
should_fire: false,
};
assert_eq!(trigger.name(), "test_trigger");
assert_eq!(trigger.poll_interval(), Duration::from_secs(1));
assert!(!trigger.allow_concurrent());
let result = trigger.poll().await.unwrap();
assert!(!result.should_fire());
}
#[tokio::test]
async fn test_trigger_fires() {
let trigger = TestTrigger {
name: "firing_trigger".to_string(),
should_fire: true,
};
let result = trigger.poll().await.unwrap();
assert!(result.should_fire());
}
}