xerv_core/traits/
trigger.rs

1//! Trigger trait and related types.
2
3use crate::error::Result;
4use crate::types::{RelPtr, TraceId};
5use std::future::Future;
6use std::pin::Pin;
7
8/// Type of trigger.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
10pub enum TriggerType {
11    /// HTTP webhook trigger.
12    Webhook,
13    /// Kafka message trigger.
14    Kafka,
15    /// Cron schedule trigger.
16    Cron,
17    /// Message queue trigger.
18    Queue,
19    /// Filesystem event trigger.
20    Filesystem,
21    /// Manual trigger (for testing).
22    Manual,
23    /// Memory trigger (for benchmarking).
24    Memory,
25}
26
27impl TriggerType {
28    /// Get the string representation.
29    pub fn as_str(&self) -> &'static str {
30        match self {
31            Self::Webhook => "trigger::webhook",
32            Self::Kafka => "trigger::kafka",
33            Self::Cron => "trigger::cron",
34            Self::Queue => "trigger::queue",
35            Self::Filesystem => "trigger::filesystem",
36            Self::Manual => "trigger::manual",
37            Self::Memory => "trigger::memory",
38        }
39    }
40
41    /// Parse from string.
42    pub fn from_str(s: &str) -> Option<Self> {
43        match s {
44            "trigger::webhook" | "webhook" => Some(Self::Webhook),
45            "trigger::kafka" | "kafka" => Some(Self::Kafka),
46            "trigger::cron" | "cron" => Some(Self::Cron),
47            "trigger::queue" | "queue" => Some(Self::Queue),
48            "trigger::filesystem" | "filesystem" => Some(Self::Filesystem),
49            "trigger::manual" | "manual" => Some(Self::Manual),
50            "trigger::memory" | "memory" => Some(Self::Memory),
51            _ => None,
52        }
53    }
54}
55
56/// Configuration for a trigger.
57#[derive(Debug, Clone)]
58pub struct TriggerConfig {
59    /// Unique ID for this trigger instance.
60    pub id: String,
61    /// Type of trigger.
62    pub trigger_type: TriggerType,
63    /// Type-specific parameters (from YAML).
64    pub params: serde_yaml::Value,
65}
66
67impl TriggerConfig {
68    /// Create a new trigger config.
69    pub fn new(id: impl Into<String>, trigger_type: TriggerType) -> Self {
70        Self {
71            id: id.into(),
72            trigger_type,
73            params: serde_yaml::Value::Null,
74        }
75    }
76
77    /// Set parameters.
78    pub fn with_params(mut self, params: serde_yaml::Value) -> Self {
79        self.params = params;
80        self
81    }
82
83    /// Get a string parameter.
84    pub fn get_string(&self, key: &str) -> Option<&str> {
85        self.params.get(key).and_then(|v| v.as_str())
86    }
87
88    /// Get an integer parameter.
89    pub fn get_i64(&self, key: &str) -> Option<i64> {
90        self.params.get(key).and_then(|v| v.as_i64())
91    }
92
93    /// Get a boolean parameter.
94    pub fn get_bool(&self, key: &str) -> Option<bool> {
95        self.params.get(key).and_then(|v| v.as_bool())
96    }
97}
98
99/// An event from a trigger.
100#[derive(Debug, Clone)]
101pub struct TriggerEvent {
102    /// The trigger ID that generated this event.
103    pub trigger_id: String,
104    /// Trace ID assigned to this event.
105    pub trace_id: TraceId,
106    /// Pointer to the event data in the arena.
107    pub data: RelPtr<()>,
108    /// Schema hash of the event data.
109    pub schema_hash: u64,
110    /// Timestamp when the event was received.
111    pub timestamp_ns: u64,
112    /// Optional metadata.
113    pub metadata: Option<String>,
114}
115
116impl TriggerEvent {
117    /// Create a new trigger event.
118    pub fn new(trigger_id: impl Into<String>, data: RelPtr<()>) -> Self {
119        Self {
120            trigger_id: trigger_id.into(),
121            trace_id: TraceId::new(),
122            data,
123            schema_hash: 0,
124            timestamp_ns: std::time::SystemTime::now()
125                .duration_since(std::time::UNIX_EPOCH)
126                .map(|d| d.as_nanos() as u64)
127                .unwrap_or(0),
128            metadata: None,
129        }
130    }
131
132    /// Set the schema hash.
133    pub fn with_schema_hash(mut self, hash: u64) -> Self {
134        self.schema_hash = hash;
135        self
136    }
137
138    /// Set metadata.
139    pub fn with_metadata(mut self, metadata: impl Into<String>) -> Self {
140        self.metadata = Some(metadata.into());
141        self
142    }
143}
144
145/// A boxed future for async trigger operations.
146pub type TriggerFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>>;
147
148/// The trait for XERV triggers.
149///
150/// Triggers are entry points that inject events into a flow.
151/// Each trigger type has its own configuration and event format.
152pub trait Trigger: Send + Sync {
153    /// Get the trigger type.
154    fn trigger_type(&self) -> TriggerType;
155
156    /// Get the trigger ID.
157    fn id(&self) -> &str;
158
159    /// Start the trigger.
160    ///
161    /// The trigger should begin listening for events and call the
162    /// provided callback when events arrive.
163    fn start<'a>(
164        &'a self,
165        callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
166    ) -> TriggerFuture<'a, ()>;
167
168    /// Stop the trigger.
169    fn stop<'a>(&'a self) -> TriggerFuture<'a, ()>;
170
171    /// Pause the trigger (stop accepting new events).
172    fn pause<'a>(&'a self) -> TriggerFuture<'a, ()>;
173
174    /// Resume the trigger.
175    fn resume<'a>(&'a self) -> TriggerFuture<'a, ()>;
176
177    /// Check if the trigger is running.
178    fn is_running(&self) -> bool;
179}
180
181/// A trigger factory that creates trigger instances from configuration.
182pub trait TriggerFactory: Send + Sync {
183    /// Get the trigger type this factory creates.
184    fn trigger_type(&self) -> TriggerType;
185
186    /// Create a new trigger instance from configuration.
187    fn create(&self, config: &TriggerConfig) -> Result<Box<dyn Trigger>>;
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn trigger_type_parsing() {
196        assert_eq!(
197            TriggerType::from_str("trigger::webhook"),
198            Some(TriggerType::Webhook)
199        );
200        assert_eq!(TriggerType::from_str("kafka"), Some(TriggerType::Kafka));
201        assert_eq!(TriggerType::from_str("unknown"), None);
202    }
203
204    #[test]
205    fn trigger_config_params() {
206        let mut params = serde_yaml::Mapping::new();
207        params.insert(
208            serde_yaml::Value::String("method".to_string()),
209            serde_yaml::Value::String("POST".to_string()),
210        );
211        params.insert(
212            serde_yaml::Value::String("port".to_string()),
213            serde_yaml::Value::Number(8080.into()),
214        );
215
216        let config = TriggerConfig::new("my_webhook", TriggerType::Webhook)
217            .with_params(serde_yaml::Value::Mapping(params));
218
219        assert_eq!(config.get_string("method"), Some("POST"));
220        assert_eq!(config.get_i64("port"), Some(8080));
221    }
222}