1use parking_lot::RwLock;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use tokio::sync::mpsc;
10use xerv_core::error::{Result, XervError};
11use xerv_core::traits::{Trigger, TriggerConfig, TriggerEvent, TriggerFuture, TriggerType};
12use xerv_core::types::RelPtr;
13
14#[derive(Debug, Clone)]
16pub struct QueueMessage {
17 pub payload: Vec<u8>,
19 pub key: Option<String>,
21 pub headers: Vec<(String, String)>,
23}
24
25impl QueueMessage {
26 pub fn new(payload: impl Into<Vec<u8>>) -> Self {
28 Self {
29 payload: payload.into(),
30 key: None,
31 headers: Vec::new(),
32 }
33 }
34
35 pub fn from_string(s: impl Into<String>) -> Self {
37 Self::new(s.into().into_bytes())
38 }
39
40 pub fn from_json<T: serde::Serialize>(value: &T) -> Result<Self> {
42 let bytes = serde_json::to_vec(value).map_err(|e| XervError::ConfigValue {
43 field: "payload".to_string(),
44 cause: format!("Failed to serialize JSON: {}", e),
45 })?;
46 Ok(Self::new(bytes))
47 }
48
49 pub fn with_key(mut self, key: impl Into<String>) -> Self {
51 self.key = Some(key.into());
52 self
53 }
54
55 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
57 self.headers.push((key.into(), value.into()));
58 self
59 }
60}
61
62struct QueueState {
64 running: AtomicBool,
66 paused: AtomicBool,
68 shutdown_tx: RwLock<Option<tokio::sync::oneshot::Sender<()>>>,
70 message_tx: RwLock<Option<mpsc::Sender<QueueMessage>>>,
72}
73
74#[derive(Clone)]
76pub struct QueueHandle {
77 tx: mpsc::Sender<QueueMessage>,
78}
79
80impl QueueHandle {
81 pub async fn send(&self, message: QueueMessage) -> Result<()> {
83 self.tx.send(message).await.map_err(|e| XervError::Network {
84 cause: format!("Failed to send message: {}", e),
85 })
86 }
87
88 pub async fn send_string(&self, s: impl Into<String>) -> Result<()> {
90 self.send(QueueMessage::from_string(s)).await
91 }
92
93 pub async fn send_json<T: serde::Serialize>(&self, value: &T) -> Result<()> {
95 self.send(QueueMessage::from_json(value)?).await
96 }
97}
98
99pub struct QueueTrigger {
118 id: String,
120 buffer_size: usize,
122 state: Arc<QueueState>,
124}
125
126impl QueueTrigger {
127 pub fn new(id: impl Into<String>) -> Self {
129 Self {
130 id: id.into(),
131 buffer_size: 100,
132 state: Arc::new(QueueState {
133 running: AtomicBool::new(false),
134 paused: AtomicBool::new(false),
135 shutdown_tx: RwLock::new(None),
136 message_tx: RwLock::new(None),
137 }),
138 }
139 }
140
141 pub fn from_config(config: &TriggerConfig) -> Result<Self> {
143 let buffer_size = config.get_i64("buffer_size").unwrap_or(100) as usize;
144
145 Ok(Self {
146 id: config.id.clone(),
147 buffer_size,
148 state: Arc::new(QueueState {
149 running: AtomicBool::new(false),
150 paused: AtomicBool::new(false),
151 shutdown_tx: RwLock::new(None),
152 message_tx: RwLock::new(None),
153 }),
154 })
155 }
156
157 pub fn with_buffer_size(mut self, size: usize) -> Self {
159 self.buffer_size = size;
160 self
161 }
162
163 pub fn handle(&self) -> Option<QueueHandle> {
167 self.state
168 .message_tx
169 .read()
170 .as_ref()
171 .map(|tx| QueueHandle { tx: tx.clone() })
172 }
173}
174
175impl Trigger for QueueTrigger {
176 fn trigger_type(&self) -> TriggerType {
177 TriggerType::Queue
178 }
179
180 fn id(&self) -> &str {
181 &self.id
182 }
183
184 fn start<'a>(
185 &'a self,
186 callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
187 ) -> TriggerFuture<'a, ()> {
188 let state = self.state.clone();
189 let buffer_size = self.buffer_size;
190 let trigger_id = self.id.clone();
191
192 Box::pin(async move {
193 if state.running.load(Ordering::SeqCst) {
194 return Err(XervError::ConfigValue {
195 field: "trigger".to_string(),
196 cause: "Trigger is already running".to_string(),
197 });
198 }
199
200 tracing::info!(
201 trigger_id = %trigger_id,
202 buffer_size = buffer_size,
203 "Queue trigger started"
204 );
205
206 state.running.store(true, Ordering::SeqCst);
207
208 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
209 *state.shutdown_tx.write() = Some(shutdown_tx);
210
211 let (msg_tx, mut msg_rx) = mpsc::channel(buffer_size);
213 *state.message_tx.write() = Some(msg_tx);
214
215 let callback = Arc::new(callback);
216
217 loop {
218 tokio::select! {
219 _ = &mut shutdown_rx => {
220 tracing::info!(trigger_id = %trigger_id, "Queue trigger shutting down");
221 break;
222 }
223 Some(message) = msg_rx.recv() => {
224 if state.paused.load(Ordering::SeqCst) {
225 tracing::debug!(trigger_id = %trigger_id, "Trigger paused, dropping message");
226 continue;
227 }
228
229 let metadata = format!(
230 "payload_size={},key={}",
231 message.payload.len(),
232 message.key.as_deref().unwrap_or("none")
233 );
234
235 let event = TriggerEvent::new(&trigger_id, RelPtr::null())
237 .with_metadata(metadata);
238
239 tracing::debug!(
240 trigger_id = %trigger_id,
241 trace_id = %event.trace_id,
242 payload_size = message.payload.len(),
243 key = ?message.key,
244 "Queue message received"
245 );
246
247 callback(event);
248 }
249 }
250 }
251
252 *state.message_tx.write() = None;
254 state.running.store(false, Ordering::SeqCst);
255 Ok(())
256 })
257 }
258
259 fn stop<'a>(&'a self) -> TriggerFuture<'a, ()> {
260 let state = self.state.clone();
261 let trigger_id = self.id.clone();
262
263 Box::pin(async move {
264 if let Some(tx) = state.shutdown_tx.write().take() {
265 let _ = tx.send(());
266 tracing::info!(trigger_id = %trigger_id, "Queue trigger stopped");
267 }
268 *state.message_tx.write() = None;
269 state.running.store(false, Ordering::SeqCst);
270 Ok(())
271 })
272 }
273
274 fn pause<'a>(&'a self) -> TriggerFuture<'a, ()> {
275 let state = self.state.clone();
276 let trigger_id = self.id.clone();
277
278 Box::pin(async move {
279 state.paused.store(true, Ordering::SeqCst);
280 tracing::info!(trigger_id = %trigger_id, "Queue trigger paused");
281 Ok(())
282 })
283 }
284
285 fn resume<'a>(&'a self) -> TriggerFuture<'a, ()> {
286 let state = self.state.clone();
287 let trigger_id = self.id.clone();
288
289 Box::pin(async move {
290 state.paused.store(false, Ordering::SeqCst);
291 tracing::info!(trigger_id = %trigger_id, "Queue trigger resumed");
292 Ok(())
293 })
294 }
295
296 fn is_running(&self) -> bool {
297 self.state.running.load(Ordering::SeqCst)
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn queue_trigger_creation() {
307 let trigger = QueueTrigger::new("test_queue");
308 assert_eq!(trigger.id(), "test_queue");
309 assert_eq!(trigger.trigger_type(), TriggerType::Queue);
310 assert!(!trigger.is_running());
311 }
312
313 #[test]
314 fn queue_trigger_from_config() {
315 let mut params = serde_yaml::Mapping::new();
316 params.insert(
317 serde_yaml::Value::String("buffer_size".to_string()),
318 serde_yaml::Value::Number(500.into()),
319 );
320
321 let config = TriggerConfig::new("queue_test", TriggerType::Queue)
322 .with_params(serde_yaml::Value::Mapping(params));
323
324 let trigger = QueueTrigger::from_config(&config).unwrap();
325 assert_eq!(trigger.id(), "queue_test");
326 assert_eq!(trigger.buffer_size, 500);
327 }
328
329 #[test]
330 fn queue_message_creation() {
331 let msg = QueueMessage::new(vec![1, 2, 3])
332 .with_key("test-key")
333 .with_header("content-type", "application/json");
334
335 assert_eq!(msg.payload, vec![1, 2, 3]);
336 assert_eq!(msg.key, Some("test-key".to_string()));
337 assert_eq!(msg.headers.len(), 1);
338 }
339
340 #[test]
341 fn queue_message_from_string() {
342 let msg = QueueMessage::from_string("hello world");
343 assert_eq!(msg.payload, b"hello world");
344 }
345
346 #[test]
347 fn queue_message_from_json() {
348 let data = serde_json::json!({"key": "value"});
349 let msg = QueueMessage::from_json(&data).unwrap();
350 assert!(!msg.payload.is_empty());
351 }
352
353 #[test]
354 fn queue_trigger_no_handle_before_start() {
355 let trigger = QueueTrigger::new("test");
356 assert!(trigger.handle().is_none());
357 }
358}