xerv_nodes/triggers/
memory.rs1use parking_lot::RwLock;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use tokio::sync::mpsc;
10use xerv_core::error::{Result, XervError};
11use xerv_core::traits::{Trigger, TriggerConfig, TriggerEvent, TriggerFuture, TriggerType};
12use xerv_core::types::RelPtr;
13
14struct MemoryState {
16 running: AtomicBool,
18 paused: AtomicBool,
20 shutdown_tx: RwLock<Option<tokio::sync::oneshot::Sender<()>>>,
22 inject_tx: RwLock<Option<mpsc::Sender<RelPtr<()>>>>,
24 event_count: AtomicU64,
26}
27
28#[derive(Clone)]
30pub struct MemoryInjector {
31 tx: mpsc::Sender<RelPtr<()>>,
32}
33
34impl MemoryInjector {
35 pub async fn inject(&self, data: RelPtr<()>) -> Result<()> {
37 self.tx.send(data).await.map_err(|e| XervError::Network {
38 cause: format!("Failed to inject event: {}", e),
39 })
40 }
41
42 pub async fn inject_empty(&self) -> Result<()> {
44 self.inject(RelPtr::null()).await
45 }
46
47 pub async fn inject_batch(&self, count: usize) -> Result<()> {
49 for _ in 0..count {
50 self.inject_empty().await?;
51 }
52 Ok(())
53 }
54}
55
56pub struct MemoryTrigger {
88 id: String,
90 buffer_size: usize,
92 state: Arc<MemoryState>,
94}
95
96impl MemoryTrigger {
97 pub fn new(id: impl Into<String>) -> Self {
99 Self {
100 id: id.into(),
101 buffer_size: 1000,
102 state: Arc::new(MemoryState {
103 running: AtomicBool::new(false),
104 paused: AtomicBool::new(false),
105 shutdown_tx: RwLock::new(None),
106 inject_tx: RwLock::new(None),
107 event_count: AtomicU64::new(0),
108 }),
109 }
110 }
111
112 pub fn from_config(config: &TriggerConfig) -> Result<Self> {
114 let buffer_size = config.get_i64("buffer_size").unwrap_or(1000) as usize;
115
116 Ok(Self {
117 id: config.id.clone(),
118 buffer_size,
119 state: Arc::new(MemoryState {
120 running: AtomicBool::new(false),
121 paused: AtomicBool::new(false),
122 shutdown_tx: RwLock::new(None),
123 inject_tx: RwLock::new(None),
124 event_count: AtomicU64::new(0),
125 }),
126 })
127 }
128
129 pub fn with_buffer_size(mut self, size: usize) -> Self {
131 self.buffer_size = size;
132 self
133 }
134
135 pub fn injector(&self) -> Option<MemoryInjector> {
139 self.state
140 .inject_tx
141 .read()
142 .as_ref()
143 .map(|tx| MemoryInjector { tx: tx.clone() })
144 }
145
146 pub fn event_count(&self) -> u64 {
148 self.state.event_count.load(Ordering::SeqCst)
149 }
150
151 pub fn reset_count(&self) {
153 self.state.event_count.store(0, Ordering::SeqCst);
154 }
155}
156
157impl Trigger for MemoryTrigger {
158 fn trigger_type(&self) -> TriggerType {
159 TriggerType::Memory
160 }
161
162 fn id(&self) -> &str {
163 &self.id
164 }
165
166 fn start<'a>(
167 &'a self,
168 callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
169 ) -> TriggerFuture<'a, ()> {
170 let state = self.state.clone();
171 let buffer_size = self.buffer_size;
172 let trigger_id = self.id.clone();
173
174 Box::pin(async move {
175 if state.running.load(Ordering::SeqCst) {
176 return Err(XervError::ConfigValue {
177 field: "trigger".to_string(),
178 cause: "Trigger is already running".to_string(),
179 });
180 }
181
182 tracing::info!(
183 trigger_id = %trigger_id,
184 buffer_size = buffer_size,
185 "Memory trigger started"
186 );
187
188 state.running.store(true, Ordering::SeqCst);
189 state.event_count.store(0, Ordering::SeqCst);
190
191 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
192 *state.shutdown_tx.write() = Some(shutdown_tx);
193
194 let (inject_tx, mut inject_rx) = mpsc::channel(buffer_size);
196 *state.inject_tx.write() = Some(inject_tx);
197
198 let callback = Arc::new(callback);
199
200 loop {
201 tokio::select! {
202 _ = &mut shutdown_rx => {
203 tracing::info!(
204 trigger_id = %trigger_id,
205 total_events = state.event_count.load(Ordering::SeqCst),
206 "Memory trigger shutting down"
207 );
208 break;
209 }
210 Some(data) = inject_rx.recv() => {
211 if state.paused.load(Ordering::SeqCst) {
212 continue;
213 }
214
215 let count = state.event_count.fetch_add(1, Ordering::SeqCst) + 1;
217
218 let event = TriggerEvent::new(&trigger_id, data)
220 .with_metadata(format!("event_number={}", count));
221
222 callback(event);
223 }
224 }
225 }
226
227 *state.inject_tx.write() = None;
229 state.running.store(false, Ordering::SeqCst);
230 Ok(())
231 })
232 }
233
234 fn stop<'a>(&'a self) -> TriggerFuture<'a, ()> {
235 let state = self.state.clone();
236 let trigger_id = self.id.clone();
237
238 Box::pin(async move {
239 if let Some(tx) = state.shutdown_tx.write().take() {
240 let _ = tx.send(());
241 tracing::info!(trigger_id = %trigger_id, "Memory trigger stopped");
242 }
243 *state.inject_tx.write() = None;
244 state.running.store(false, Ordering::SeqCst);
245 Ok(())
246 })
247 }
248
249 fn pause<'a>(&'a self) -> TriggerFuture<'a, ()> {
250 let state = self.state.clone();
251 let trigger_id = self.id.clone();
252
253 Box::pin(async move {
254 state.paused.store(true, Ordering::SeqCst);
255 tracing::info!(trigger_id = %trigger_id, "Memory trigger paused");
256 Ok(())
257 })
258 }
259
260 fn resume<'a>(&'a self) -> TriggerFuture<'a, ()> {
261 let state = self.state.clone();
262 let trigger_id = self.id.clone();
263
264 Box::pin(async move {
265 state.paused.store(false, Ordering::SeqCst);
266 tracing::info!(trigger_id = %trigger_id, "Memory trigger resumed");
267 Ok(())
268 })
269 }
270
271 fn is_running(&self) -> bool {
272 self.state.running.load(Ordering::SeqCst)
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn memory_trigger_creation() {
282 let trigger = MemoryTrigger::new("bench_trigger");
283 assert_eq!(trigger.id(), "bench_trigger");
284 assert_eq!(trigger.trigger_type(), TriggerType::Memory);
285 assert!(!trigger.is_running());
286 assert_eq!(trigger.event_count(), 0);
287 }
288
289 #[test]
290 fn memory_trigger_from_config() {
291 let mut params = serde_yaml::Mapping::new();
292 params.insert(
293 serde_yaml::Value::String("buffer_size".to_string()),
294 serde_yaml::Value::Number(5000.into()),
295 );
296
297 let config = TriggerConfig::new("mem_test", TriggerType::Memory)
298 .with_params(serde_yaml::Value::Mapping(params));
299
300 let trigger = MemoryTrigger::from_config(&config).unwrap();
301 assert_eq!(trigger.id(), "mem_test");
302 assert_eq!(trigger.buffer_size, 5000);
303 }
304
305 #[test]
306 fn memory_trigger_no_injector_before_start() {
307 let trigger = MemoryTrigger::new("test");
308 assert!(trigger.injector().is_none());
309 }
310
311 #[test]
312 fn memory_trigger_reset_count() {
313 let trigger = MemoryTrigger::new("test");
314 trigger.state.event_count.store(100, Ordering::SeqCst);
315 assert_eq!(trigger.event_count(), 100);
316 trigger.reset_count();
317 assert_eq!(trigger.event_count(), 0);
318 }
319}