mockforge_ws/
ai_event_generator.rs1use axum::extract::ws::{Message, WebSocket};
7use mockforge_data::{ReplayAugmentationConfig, ReplayAugmentationEngine};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tokio::time::{sleep, Duration};
11use tracing::{debug, error, info};
12
13pub struct AiEventGenerator {
15 engine: Arc<RwLock<ReplayAugmentationEngine>>,
17}
18
19impl AiEventGenerator {
20 pub fn new(config: ReplayAugmentationConfig) -> mockforge_core::Result<Self> {
22 debug!("Creating AI event generator");
23 let engine = ReplayAugmentationEngine::new(config)
24 .map_err(|e| mockforge_core::Error::generic(e.to_string()))?;
25 Ok(Self {
26 engine: Arc::new(RwLock::new(engine)),
27 })
28 }
29
30 pub async fn stream_events(&self, mut socket: WebSocket, max_events: Option<usize>) {
35 info!("Starting AI event stream (max_events: {:?})", max_events);
36
37 let events = match self.engine.write().await.generate_stream().await {
39 Ok(events) => events,
40 Err(e) => {
41 error!("Failed to generate event stream: {}", e);
42 return;
43 }
44 };
45
46 info!("Generated {} events from AI engine", events.len());
47
48 let max = max_events.unwrap_or(events.len());
49 let events_to_send = events.into_iter().take(max);
50
51 for event in events_to_send {
52 let message_json = serde_json::json!({
54 "type": event.event_type,
55 "timestamp": event.timestamp.to_rfc3339(),
56 "sequence": event.sequence,
57 "data": event.data
58 });
59
60 let message_str = match serde_json::to_string(&message_json) {
61 Ok(s) => s,
62 Err(e) => {
63 error!("Failed to serialize event: {}", e);
64 continue;
65 }
66 };
67
68 debug!("Sending AI-generated event: {}", message_str);
69
70 if socket.send(Message::Text(message_str.into())).await.is_err() {
72 info!("Client disconnected, stopping event stream");
73 break;
74 }
75
76 sleep(Duration::from_millis(100)).await;
78 }
79
80 info!("AI event stream completed");
81 }
82
83 pub async fn stream_events_with_rate(
85 &self,
86 mut socket: WebSocket,
87 max_events: Option<usize>,
88 events_per_second: f64,
89 ) {
90 info!(
91 "Starting AI event stream (max_events: {:?}, rate: {} events/sec)",
92 max_events, events_per_second
93 );
94
95 let events = match self.engine.write().await.generate_stream().await {
97 Ok(events) => events,
98 Err(e) => {
99 error!("Failed to generate event stream: {}", e);
100 return;
101 }
102 };
103
104 info!("Generated {} events from AI engine", events.len());
105
106 let delay_ms = (1000.0 / events_per_second) as u64;
107 let max = max_events.unwrap_or(events.len());
108 let events_to_send = events.into_iter().take(max);
109
110 for event in events_to_send {
111 let message_json = serde_json::json!({
113 "type": event.event_type,
114 "timestamp": event.timestamp.to_rfc3339(),
115 "sequence": event.sequence,
116 "data": event.data
117 });
118
119 let message_str = match serde_json::to_string(&message_json) {
120 Ok(s) => s,
121 Err(e) => {
122 error!("Failed to serialize event: {}", e);
123 continue;
124 }
125 };
126
127 debug!("Sending AI-generated event: {}", message_str);
128
129 if socket.send(Message::Text(message_str.into())).await.is_err() {
131 info!("Client disconnected, stopping event stream");
132 break;
133 }
134
135 sleep(Duration::from_millis(delay_ms)).await;
137 }
138
139 info!("AI event stream completed");
140 }
141}
142
143#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
145pub struct WebSocketAiConfig {
146 pub enabled: bool,
148 pub replay: Option<ReplayAugmentationConfig>,
150 pub max_events: Option<usize>,
152 pub event_rate: Option<f64>,
154}
155
156impl Default for WebSocketAiConfig {
157 fn default() -> Self {
158 Self {
159 enabled: false,
160 replay: None,
161 max_events: Some(100),
162 event_rate: Some(1.0),
163 }
164 }
165}
166
167impl WebSocketAiConfig {
168 pub fn is_enabled(&self) -> bool {
170 self.enabled && self.replay.is_some()
171 }
172
173 pub fn create_generator(&self) -> mockforge_core::Result<Option<AiEventGenerator>> {
175 if let Some(replay_config) = &self.replay {
176 let generator = AiEventGenerator::new(replay_config.clone())?;
177 Ok(Some(generator))
178 } else {
179 Ok(None)
180 }
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use mockforge_data::{EventStrategy, ReplayMode};
188
189 #[test]
190 fn test_websocket_ai_config_default() {
191 let config = WebSocketAiConfig::default();
192 assert!(!config.is_enabled());
193 assert_eq!(config.max_events, Some(100));
194 assert_eq!(config.event_rate, Some(1.0));
195 }
196
197 #[test]
198 fn test_websocket_ai_config_is_enabled() {
199 let mut config = WebSocketAiConfig {
200 enabled: true,
201 ..Default::default()
202 };
203
204 assert!(!config.is_enabled());
206
207 config.replay = Some(ReplayAugmentationConfig {
209 mode: ReplayMode::Generated,
210 strategy: EventStrategy::CountBased,
211 ..Default::default()
212 });
213 assert!(config.is_enabled());
214 }
215}