xerv_nodes/triggers/
memory.rs

1//! Memory trigger (benchmarking).
2//!
3//! Direct memory injection trigger for performance testing and benchmarks.
4//! Events are injected directly without any I/O overhead.
5
6use parking_lot::RwLock;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use tokio::sync::mpsc;
10use xerv_core::error::{Result, XervError};
11use xerv_core::traits::{Trigger, TriggerConfig, TriggerEvent, TriggerFuture, TriggerType};
12use xerv_core::types::RelPtr;
13
14/// State for the memory trigger.
15struct MemoryState {
16    /// Whether the trigger is running.
17    running: AtomicBool,
18    /// Whether the trigger is paused.
19    paused: AtomicBool,
20    /// Shutdown signal sender.
21    shutdown_tx: RwLock<Option<tokio::sync::oneshot::Sender<()>>>,
22    /// Event injection channel.
23    inject_tx: RwLock<Option<mpsc::Sender<RelPtr<()>>>>,
24    /// Event counter for statistics.
25    event_count: AtomicU64,
26}
27
28/// Handle for injecting events into a memory trigger.
29#[derive(Clone)]
30pub struct MemoryInjector {
31    tx: mpsc::Sender<RelPtr<()>>,
32}
33
34impl MemoryInjector {
35    /// Inject an event with the given data pointer.
36    pub async fn inject(&self, data: RelPtr<()>) -> Result<()> {
37        self.tx.send(data).await.map_err(|e| XervError::Network {
38            cause: format!("Failed to inject event: {}", e),
39        })
40    }
41
42    /// Inject an event with a null data pointer.
43    pub async fn inject_empty(&self) -> Result<()> {
44        self.inject(RelPtr::null()).await
45    }
46
47    /// Inject multiple events.
48    pub async fn inject_batch(&self, count: usize) -> Result<()> {
49        for _ in 0..count {
50            self.inject_empty().await?;
51        }
52        Ok(())
53    }
54}
55
56/// Memory trigger for benchmarking.
57///
58/// Provides zero-overhead event injection for performance testing.
59/// Events are injected directly via `MemoryInjector`.
60///
61/// # Configuration
62///
63/// ```yaml
64/// triggers:
65///   - id: bench_trigger
66///     type: trigger::memory
67///     params:
68///       buffer_size: 10000
69/// ```
70///
71/// # Parameters
72///
73/// - `buffer_size` - Maximum events to buffer (default: 1000)
74///
75/// # Usage
76///
77/// ```ignore
78/// let trigger = MemoryTrigger::new("bench");
79/// // Start the trigger...
80/// let injector = trigger.injector().unwrap();
81///
82/// // Inject events for benchmarking
83/// for _ in 0..1000 {
84///     injector.inject_empty().await?;
85/// }
86/// ```
87pub struct MemoryTrigger {
88    /// Trigger ID.
89    id: String,
90    /// Buffer size.
91    buffer_size: usize,
92    /// Internal state.
93    state: Arc<MemoryState>,
94}
95
96impl MemoryTrigger {
97    /// Create a new memory trigger.
98    pub fn new(id: impl Into<String>) -> Self {
99        Self {
100            id: id.into(),
101            buffer_size: 1000,
102            state: Arc::new(MemoryState {
103                running: AtomicBool::new(false),
104                paused: AtomicBool::new(false),
105                shutdown_tx: RwLock::new(None),
106                inject_tx: RwLock::new(None),
107                event_count: AtomicU64::new(0),
108            }),
109        }
110    }
111
112    /// Create from configuration.
113    pub fn from_config(config: &TriggerConfig) -> Result<Self> {
114        let buffer_size = config.get_i64("buffer_size").unwrap_or(1000) as usize;
115
116        Ok(Self {
117            id: config.id.clone(),
118            buffer_size,
119            state: Arc::new(MemoryState {
120                running: AtomicBool::new(false),
121                paused: AtomicBool::new(false),
122                shutdown_tx: RwLock::new(None),
123                inject_tx: RwLock::new(None),
124                event_count: AtomicU64::new(0),
125            }),
126        })
127    }
128
129    /// Set the buffer size.
130    pub fn with_buffer_size(mut self, size: usize) -> Self {
131        self.buffer_size = size;
132        self
133    }
134
135    /// Get an injector for sending events.
136    ///
137    /// Returns `None` if the trigger hasn't been started.
138    pub fn injector(&self) -> Option<MemoryInjector> {
139        self.state
140            .inject_tx
141            .read()
142            .as_ref()
143            .map(|tx| MemoryInjector { tx: tx.clone() })
144    }
145
146    /// Get the number of events processed.
147    pub fn event_count(&self) -> u64 {
148        self.state.event_count.load(Ordering::SeqCst)
149    }
150
151    /// Reset the event counter.
152    pub fn reset_count(&self) {
153        self.state.event_count.store(0, Ordering::SeqCst);
154    }
155}
156
157impl Trigger for MemoryTrigger {
158    fn trigger_type(&self) -> TriggerType {
159        TriggerType::Memory
160    }
161
162    fn id(&self) -> &str {
163        &self.id
164    }
165
166    fn start<'a>(
167        &'a self,
168        callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
169    ) -> TriggerFuture<'a, ()> {
170        let state = self.state.clone();
171        let buffer_size = self.buffer_size;
172        let trigger_id = self.id.clone();
173
174        Box::pin(async move {
175            if state.running.load(Ordering::SeqCst) {
176                return Err(XervError::ConfigValue {
177                    field: "trigger".to_string(),
178                    cause: "Trigger is already running".to_string(),
179                });
180            }
181
182            tracing::info!(
183                trigger_id = %trigger_id,
184                buffer_size = buffer_size,
185                "Memory trigger started"
186            );
187
188            state.running.store(true, Ordering::SeqCst);
189            state.event_count.store(0, Ordering::SeqCst);
190
191            let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
192            *state.shutdown_tx.write() = Some(shutdown_tx);
193
194            // Create injection channel
195            let (inject_tx, mut inject_rx) = mpsc::channel(buffer_size);
196            *state.inject_tx.write() = Some(inject_tx);
197
198            let callback = Arc::new(callback);
199
200            loop {
201                tokio::select! {
202                    _ = &mut shutdown_rx => {
203                        tracing::info!(
204                            trigger_id = %trigger_id,
205                            total_events = state.event_count.load(Ordering::SeqCst),
206                            "Memory trigger shutting down"
207                        );
208                        break;
209                    }
210                    Some(data) = inject_rx.recv() => {
211                        if state.paused.load(Ordering::SeqCst) {
212                            continue;
213                        }
214
215                        // Increment counter
216                        let count = state.event_count.fetch_add(1, Ordering::SeqCst) + 1;
217
218                        // Create event
219                        let event = TriggerEvent::new(&trigger_id, data)
220                            .with_metadata(format!("event_number={}", count));
221
222                        callback(event);
223                    }
224                }
225            }
226
227            // Clean up
228            *state.inject_tx.write() = None;
229            state.running.store(false, Ordering::SeqCst);
230            Ok(())
231        })
232    }
233
234    fn stop<'a>(&'a self) -> TriggerFuture<'a, ()> {
235        let state = self.state.clone();
236        let trigger_id = self.id.clone();
237
238        Box::pin(async move {
239            if let Some(tx) = state.shutdown_tx.write().take() {
240                let _ = tx.send(());
241                tracing::info!(trigger_id = %trigger_id, "Memory trigger stopped");
242            }
243            *state.inject_tx.write() = None;
244            state.running.store(false, Ordering::SeqCst);
245            Ok(())
246        })
247    }
248
249    fn pause<'a>(&'a self) -> TriggerFuture<'a, ()> {
250        let state = self.state.clone();
251        let trigger_id = self.id.clone();
252
253        Box::pin(async move {
254            state.paused.store(true, Ordering::SeqCst);
255            tracing::info!(trigger_id = %trigger_id, "Memory trigger paused");
256            Ok(())
257        })
258    }
259
260    fn resume<'a>(&'a self) -> TriggerFuture<'a, ()> {
261        let state = self.state.clone();
262        let trigger_id = self.id.clone();
263
264        Box::pin(async move {
265            state.paused.store(false, Ordering::SeqCst);
266            tracing::info!(trigger_id = %trigger_id, "Memory trigger resumed");
267            Ok(())
268        })
269    }
270
271    fn is_running(&self) -> bool {
272        self.state.running.load(Ordering::SeqCst)
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn memory_trigger_creation() {
282        let trigger = MemoryTrigger::new("bench_trigger");
283        assert_eq!(trigger.id(), "bench_trigger");
284        assert_eq!(trigger.trigger_type(), TriggerType::Memory);
285        assert!(!trigger.is_running());
286        assert_eq!(trigger.event_count(), 0);
287    }
288
289    #[test]
290    fn memory_trigger_from_config() {
291        let mut params = serde_yaml::Mapping::new();
292        params.insert(
293            serde_yaml::Value::String("buffer_size".to_string()),
294            serde_yaml::Value::Number(5000.into()),
295        );
296
297        let config = TriggerConfig::new("mem_test", TriggerType::Memory)
298            .with_params(serde_yaml::Value::Mapping(params));
299
300        let trigger = MemoryTrigger::from_config(&config).unwrap();
301        assert_eq!(trigger.id(), "mem_test");
302        assert_eq!(trigger.buffer_size, 5000);
303    }
304
305    #[test]
306    fn memory_trigger_no_injector_before_start() {
307        let trigger = MemoryTrigger::new("test");
308        assert!(trigger.injector().is_none());
309    }
310
311    #[test]
312    fn memory_trigger_reset_count() {
313        let trigger = MemoryTrigger::new("test");
314        trigger.state.event_count.store(100, Ordering::SeqCst);
315        assert_eq!(trigger.event_count(), 100);
316        trigger.reset_count();
317        assert_eq!(trigger.event_count(), 0);
318    }
319}