xerv_nodes/triggers/
queue.rs

1//! Queue trigger (message queue).
2//!
3//! In-memory message queue trigger for testing and lightweight use cases.
4//! For production, consider using Kafka or a dedicated message broker.
5
6use parking_lot::RwLock;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use tokio::sync::mpsc;
10use xerv_core::error::{Result, XervError};
11use xerv_core::traits::{Trigger, TriggerConfig, TriggerEvent, TriggerFuture, TriggerType};
12use xerv_core::types::RelPtr;
13
14/// Message in the queue.
15#[derive(Debug, Clone)]
16pub struct QueueMessage {
17    /// Message payload.
18    pub payload: Vec<u8>,
19    /// Optional message key.
20    pub key: Option<String>,
21    /// Optional headers.
22    pub headers: Vec<(String, String)>,
23}
24
25impl QueueMessage {
26    /// Create a new message with payload.
27    pub fn new(payload: impl Into<Vec<u8>>) -> Self {
28        Self {
29            payload: payload.into(),
30            key: None,
31            headers: Vec::new(),
32        }
33    }
34
35    /// Create a message from a string.
36    pub fn from_string(s: impl Into<String>) -> Self {
37        Self::new(s.into().into_bytes())
38    }
39
40    /// Create a message from JSON.
41    pub fn from_json<T: serde::Serialize>(value: &T) -> Result<Self> {
42        let bytes = serde_json::to_vec(value).map_err(|e| XervError::ConfigValue {
43            field: "payload".to_string(),
44            cause: format!("Failed to serialize JSON: {}", e),
45        })?;
46        Ok(Self::new(bytes))
47    }
48
49    /// Set the message key.
50    pub fn with_key(mut self, key: impl Into<String>) -> Self {
51        self.key = Some(key.into());
52        self
53    }
54
55    /// Add a header.
56    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
57        self.headers.push((key.into(), value.into()));
58        self
59    }
60}
61
62/// State for the queue trigger.
63struct QueueState {
64    /// Whether the trigger is running.
65    running: AtomicBool,
66    /// Whether the trigger is paused.
67    paused: AtomicBool,
68    /// Shutdown signal sender.
69    shutdown_tx: RwLock<Option<tokio::sync::oneshot::Sender<()>>>,
70    /// Message sender for pushing messages to the queue.
71    message_tx: RwLock<Option<mpsc::Sender<QueueMessage>>>,
72}
73
74/// Queue handle for sending messages.
75#[derive(Clone)]
76pub struct QueueHandle {
77    tx: mpsc::Sender<QueueMessage>,
78}
79
80impl QueueHandle {
81    /// Send a message to the queue.
82    pub async fn send(&self, message: QueueMessage) -> Result<()> {
83        self.tx.send(message).await.map_err(|e| XervError::Network {
84            cause: format!("Failed to send message: {}", e),
85        })
86    }
87
88    /// Send a string message.
89    pub async fn send_string(&self, s: impl Into<String>) -> Result<()> {
90        self.send(QueueMessage::from_string(s)).await
91    }
92
93    /// Send a JSON message.
94    pub async fn send_json<T: serde::Serialize>(&self, value: &T) -> Result<()> {
95        self.send(QueueMessage::from_json(value)?).await
96    }
97}
98
99/// In-memory queue trigger.
100///
101/// Provides an in-memory message queue for testing and lightweight scenarios.
102/// Messages can be pushed via the `QueueHandle`.
103///
104/// # Configuration
105///
106/// ```yaml
107/// triggers:
108///   - id: event_queue
109///     type: trigger::queue
110///     params:
111///       buffer_size: 1000
112/// ```
113///
114/// # Parameters
115///
116/// - `buffer_size` - Maximum messages to buffer (default: 100)
117pub struct QueueTrigger {
118    /// Trigger ID.
119    id: String,
120    /// Buffer size for the queue.
121    buffer_size: usize,
122    /// Internal state.
123    state: Arc<QueueState>,
124}
125
126impl QueueTrigger {
127    /// Create a new queue trigger.
128    pub fn new(id: impl Into<String>) -> Self {
129        Self {
130            id: id.into(),
131            buffer_size: 100,
132            state: Arc::new(QueueState {
133                running: AtomicBool::new(false),
134                paused: AtomicBool::new(false),
135                shutdown_tx: RwLock::new(None),
136                message_tx: RwLock::new(None),
137            }),
138        }
139    }
140
141    /// Create from configuration.
142    pub fn from_config(config: &TriggerConfig) -> Result<Self> {
143        let buffer_size = config.get_i64("buffer_size").unwrap_or(100) as usize;
144
145        Ok(Self {
146            id: config.id.clone(),
147            buffer_size,
148            state: Arc::new(QueueState {
149                running: AtomicBool::new(false),
150                paused: AtomicBool::new(false),
151                shutdown_tx: RwLock::new(None),
152                message_tx: RwLock::new(None),
153            }),
154        })
155    }
156
157    /// Set the buffer size.
158    pub fn with_buffer_size(mut self, size: usize) -> Self {
159        self.buffer_size = size;
160        self
161    }
162
163    /// Get a handle to send messages to this queue.
164    ///
165    /// Returns `None` if the trigger hasn't been started yet.
166    pub fn handle(&self) -> Option<QueueHandle> {
167        self.state
168            .message_tx
169            .read()
170            .as_ref()
171            .map(|tx| QueueHandle { tx: tx.clone() })
172    }
173}
174
175impl Trigger for QueueTrigger {
176    fn trigger_type(&self) -> TriggerType {
177        TriggerType::Queue
178    }
179
180    fn id(&self) -> &str {
181        &self.id
182    }
183
184    fn start<'a>(
185        &'a self,
186        callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
187    ) -> TriggerFuture<'a, ()> {
188        let state = self.state.clone();
189        let buffer_size = self.buffer_size;
190        let trigger_id = self.id.clone();
191
192        Box::pin(async move {
193            if state.running.load(Ordering::SeqCst) {
194                return Err(XervError::ConfigValue {
195                    field: "trigger".to_string(),
196                    cause: "Trigger is already running".to_string(),
197                });
198            }
199
200            tracing::info!(
201                trigger_id = %trigger_id,
202                buffer_size = buffer_size,
203                "Queue trigger started"
204            );
205
206            state.running.store(true, Ordering::SeqCst);
207
208            let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
209            *state.shutdown_tx.write() = Some(shutdown_tx);
210
211            // Create message channel
212            let (msg_tx, mut msg_rx) = mpsc::channel(buffer_size);
213            *state.message_tx.write() = Some(msg_tx);
214
215            let callback = Arc::new(callback);
216
217            loop {
218                tokio::select! {
219                    _ = &mut shutdown_rx => {
220                        tracing::info!(trigger_id = %trigger_id, "Queue trigger shutting down");
221                        break;
222                    }
223                    Some(message) = msg_rx.recv() => {
224                        if state.paused.load(Ordering::SeqCst) {
225                            tracing::debug!(trigger_id = %trigger_id, "Trigger paused, dropping message");
226                            continue;
227                        }
228
229                        let metadata = format!(
230                            "payload_size={},key={}",
231                            message.payload.len(),
232                            message.key.as_deref().unwrap_or("none")
233                        );
234
235                        // Create trigger event
236                        let event = TriggerEvent::new(&trigger_id, RelPtr::null())
237                            .with_metadata(metadata);
238
239                        tracing::debug!(
240                            trigger_id = %trigger_id,
241                            trace_id = %event.trace_id,
242                            payload_size = message.payload.len(),
243                            key = ?message.key,
244                            "Queue message received"
245                        );
246
247                        callback(event);
248                    }
249                }
250            }
251
252            // Clean up
253            *state.message_tx.write() = None;
254            state.running.store(false, Ordering::SeqCst);
255            Ok(())
256        })
257    }
258
259    fn stop<'a>(&'a self) -> TriggerFuture<'a, ()> {
260        let state = self.state.clone();
261        let trigger_id = self.id.clone();
262
263        Box::pin(async move {
264            if let Some(tx) = state.shutdown_tx.write().take() {
265                let _ = tx.send(());
266                tracing::info!(trigger_id = %trigger_id, "Queue trigger stopped");
267            }
268            *state.message_tx.write() = None;
269            state.running.store(false, Ordering::SeqCst);
270            Ok(())
271        })
272    }
273
274    fn pause<'a>(&'a self) -> TriggerFuture<'a, ()> {
275        let state = self.state.clone();
276        let trigger_id = self.id.clone();
277
278        Box::pin(async move {
279            state.paused.store(true, Ordering::SeqCst);
280            tracing::info!(trigger_id = %trigger_id, "Queue trigger paused");
281            Ok(())
282        })
283    }
284
285    fn resume<'a>(&'a self) -> TriggerFuture<'a, ()> {
286        let state = self.state.clone();
287        let trigger_id = self.id.clone();
288
289        Box::pin(async move {
290            state.paused.store(false, Ordering::SeqCst);
291            tracing::info!(trigger_id = %trigger_id, "Queue trigger resumed");
292            Ok(())
293        })
294    }
295
296    fn is_running(&self) -> bool {
297        self.state.running.load(Ordering::SeqCst)
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[test]
306    fn queue_trigger_creation() {
307        let trigger = QueueTrigger::new("test_queue");
308        assert_eq!(trigger.id(), "test_queue");
309        assert_eq!(trigger.trigger_type(), TriggerType::Queue);
310        assert!(!trigger.is_running());
311    }
312
313    #[test]
314    fn queue_trigger_from_config() {
315        let mut params = serde_yaml::Mapping::new();
316        params.insert(
317            serde_yaml::Value::String("buffer_size".to_string()),
318            serde_yaml::Value::Number(500.into()),
319        );
320
321        let config = TriggerConfig::new("queue_test", TriggerType::Queue)
322            .with_params(serde_yaml::Value::Mapping(params));
323
324        let trigger = QueueTrigger::from_config(&config).unwrap();
325        assert_eq!(trigger.id(), "queue_test");
326        assert_eq!(trigger.buffer_size, 500);
327    }
328
329    #[test]
330    fn queue_message_creation() {
331        let msg = QueueMessage::new(vec![1, 2, 3])
332            .with_key("test-key")
333            .with_header("content-type", "application/json");
334
335        assert_eq!(msg.payload, vec![1, 2, 3]);
336        assert_eq!(msg.key, Some("test-key".to_string()));
337        assert_eq!(msg.headers.len(), 1);
338    }
339
340    #[test]
341    fn queue_message_from_string() {
342        let msg = QueueMessage::from_string("hello world");
343        assert_eq!(msg.payload, b"hello world");
344    }
345
346    #[test]
347    fn queue_message_from_json() {
348        let data = serde_json::json!({"key": "value"});
349        let msg = QueueMessage::from_json(&data).unwrap();
350        assert!(!msg.payload.is_empty());
351    }
352
353    #[test]
354    fn queue_trigger_no_handle_before_start() {
355        let trigger = QueueTrigger::new("test");
356        assert!(trigger.handle().is_none());
357    }
358}