xerv_core/traits/
trigger.rs1use crate::error::Result;
4use crate::types::{RelPtr, TraceId};
5use std::future::Future;
6use std::pin::Pin;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
10pub enum TriggerType {
11 Webhook,
13 Kafka,
15 Cron,
17 Queue,
19 Filesystem,
21 Manual,
23 Memory,
25}
26
27impl TriggerType {
28 pub fn as_str(&self) -> &'static str {
30 match self {
31 Self::Webhook => "trigger::webhook",
32 Self::Kafka => "trigger::kafka",
33 Self::Cron => "trigger::cron",
34 Self::Queue => "trigger::queue",
35 Self::Filesystem => "trigger::filesystem",
36 Self::Manual => "trigger::manual",
37 Self::Memory => "trigger::memory",
38 }
39 }
40
41 pub fn from_str(s: &str) -> Option<Self> {
43 match s {
44 "trigger::webhook" | "webhook" => Some(Self::Webhook),
45 "trigger::kafka" | "kafka" => Some(Self::Kafka),
46 "trigger::cron" | "cron" => Some(Self::Cron),
47 "trigger::queue" | "queue" => Some(Self::Queue),
48 "trigger::filesystem" | "filesystem" => Some(Self::Filesystem),
49 "trigger::manual" | "manual" => Some(Self::Manual),
50 "trigger::memory" | "memory" => Some(Self::Memory),
51 _ => None,
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
58pub struct TriggerConfig {
59 pub id: String,
61 pub trigger_type: TriggerType,
63 pub params: serde_yaml::Value,
65}
66
67impl TriggerConfig {
68 pub fn new(id: impl Into<String>, trigger_type: TriggerType) -> Self {
70 Self {
71 id: id.into(),
72 trigger_type,
73 params: serde_yaml::Value::Null,
74 }
75 }
76
77 pub fn with_params(mut self, params: serde_yaml::Value) -> Self {
79 self.params = params;
80 self
81 }
82
83 pub fn get_string(&self, key: &str) -> Option<&str> {
85 self.params.get(key).and_then(|v| v.as_str())
86 }
87
88 pub fn get_i64(&self, key: &str) -> Option<i64> {
90 self.params.get(key).and_then(|v| v.as_i64())
91 }
92
93 pub fn get_bool(&self, key: &str) -> Option<bool> {
95 self.params.get(key).and_then(|v| v.as_bool())
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct TriggerEvent {
102 pub trigger_id: String,
104 pub trace_id: TraceId,
106 pub data: RelPtr<()>,
108 pub schema_hash: u64,
110 pub timestamp_ns: u64,
112 pub metadata: Option<String>,
114}
115
116impl TriggerEvent {
117 pub fn new(trigger_id: impl Into<String>, data: RelPtr<()>) -> Self {
119 Self {
120 trigger_id: trigger_id.into(),
121 trace_id: TraceId::new(),
122 data,
123 schema_hash: 0,
124 timestamp_ns: std::time::SystemTime::now()
125 .duration_since(std::time::UNIX_EPOCH)
126 .map(|d| d.as_nanos() as u64)
127 .unwrap_or(0),
128 metadata: None,
129 }
130 }
131
132 pub fn with_schema_hash(mut self, hash: u64) -> Self {
134 self.schema_hash = hash;
135 self
136 }
137
138 pub fn with_metadata(mut self, metadata: impl Into<String>) -> Self {
140 self.metadata = Some(metadata.into());
141 self
142 }
143}
144
145pub type TriggerFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>>;
147
148pub trait Trigger: Send + Sync {
153 fn trigger_type(&self) -> TriggerType;
155
156 fn id(&self) -> &str;
158
159 fn start<'a>(
164 &'a self,
165 callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
166 ) -> TriggerFuture<'a, ()>;
167
168 fn stop<'a>(&'a self) -> TriggerFuture<'a, ()>;
170
171 fn pause<'a>(&'a self) -> TriggerFuture<'a, ()>;
173
174 fn resume<'a>(&'a self) -> TriggerFuture<'a, ()>;
176
177 fn is_running(&self) -> bool;
179}
180
181pub trait TriggerFactory: Send + Sync {
183 fn trigger_type(&self) -> TriggerType;
185
186 fn create(&self, config: &TriggerConfig) -> Result<Box<dyn Trigger>>;
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn trigger_type_parsing() {
196 assert_eq!(
197 TriggerType::from_str("trigger::webhook"),
198 Some(TriggerType::Webhook)
199 );
200 assert_eq!(TriggerType::from_str("kafka"), Some(TriggerType::Kafka));
201 assert_eq!(TriggerType::from_str("unknown"), None);
202 }
203
204 #[test]
205 fn trigger_config_params() {
206 let mut params = serde_yaml::Mapping::new();
207 params.insert(
208 serde_yaml::Value::String("method".to_string()),
209 serde_yaml::Value::String("POST".to_string()),
210 );
211 params.insert(
212 serde_yaml::Value::String("port".to_string()),
213 serde_yaml::Value::Number(8080.into()),
214 );
215
216 let config = TriggerConfig::new("my_webhook", TriggerType::Webhook)
217 .with_params(serde_yaml::Value::Mapping(params));
218
219 assert_eq!(config.get_string("method"), Some("POST"));
220 assert_eq!(config.get_i64("port"), Some(8080));
221 }
222}