1use crate::cdc::{CdcConfig, ChangeEvent};
9use crate::channel::{Channel, ChannelConfig, ChannelError, ChannelId, ChannelReceiver};
10use crate::event::{Event, EventFilter};
11use crate::subscriber::{Subscriber, SubscriberId, Subscription};
12use std::collections::HashMap;
13use std::sync::RwLock;
14
15#[derive(Debug, Clone)]
21pub struct EngineConfig {
22 pub max_channels: usize,
23 pub max_subscribers: usize,
24 pub default_channel_config: ChannelConfig,
25 pub cdc_config: CdcConfig,
26}
27
28impl Default for EngineConfig {
29 fn default() -> Self {
30 Self {
31 max_channels: 1000,
32 max_subscribers: 10000,
33 default_channel_config: ChannelConfig::default(),
34 cdc_config: CdcConfig::default(),
35 }
36 }
37}
38
39pub struct StreamingEngine {
45 config: EngineConfig,
46 channels: RwLock<HashMap<ChannelId, Channel>>,
47 subscribers: RwLock<HashMap<SubscriberId, Subscriber>>,
48 stats: RwLock<EngineStats>,
49}
50
51impl StreamingEngine {
52 pub fn new() -> Self {
54 Self::with_config(EngineConfig::default())
55 }
56
57 pub fn with_config(config: EngineConfig) -> Self {
59 Self {
60 config,
61 channels: RwLock::new(HashMap::new()),
62 subscribers: RwLock::new(HashMap::new()),
63 stats: RwLock::new(EngineStats::default()),
64 }
65 }
66
67 pub fn create_channel(&self, id: impl Into<ChannelId>) -> Result<(), EngineError> {
73 let id = id.into();
74 let mut channels = self.channels.write().unwrap();
75
76 if channels.len() >= self.config.max_channels {
77 return Err(EngineError::TooManyChannels);
78 }
79
80 if channels.contains_key(&id) {
81 return Err(EngineError::ChannelExists(id));
82 }
83
84 let channel = Channel::with_config(id.clone(), self.config.default_channel_config.clone());
85 channels.insert(id, channel);
86
87 Ok(())
88 }
89
90 pub fn create_channel_with_config(
92 &self,
93 id: impl Into<ChannelId>,
94 config: ChannelConfig,
95 ) -> Result<(), EngineError> {
96 let id = id.into();
97 let mut channels = self.channels.write().unwrap();
98
99 if channels.len() >= self.config.max_channels {
100 return Err(EngineError::TooManyChannels);
101 }
102
103 if channels.contains_key(&id) {
104 return Err(EngineError::ChannelExists(id));
105 }
106
107 let channel = Channel::with_config(id.clone(), config);
108 channels.insert(id, channel);
109
110 Ok(())
111 }
112
113 pub fn delete_channel(&self, id: &ChannelId) -> Result<(), EngineError> {
115 let mut channels = self.channels.write().unwrap();
116
117 if channels.remove(id).is_none() {
118 return Err(EngineError::ChannelNotFound(id.clone()));
119 }
120
121 Ok(())
122 }
123
124 pub fn list_channels(&self) -> Vec<ChannelId> {
126 let channels = self.channels.read().unwrap();
127 channels.keys().cloned().collect()
128 }
129
130 pub fn channel_exists(&self, id: &ChannelId) -> bool {
132 let channels = self.channels.read().unwrap();
133 channels.contains_key(id)
134 }
135
136 pub fn publish(&self, channel_id: &ChannelId, event: Event) -> Result<usize, EngineError> {
142 let channels = self.channels.read().unwrap();
143 let channel = channels
144 .get(channel_id)
145 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
146
147 let receivers = channel.publish(event).map_err(EngineError::Channel)?;
148
149 drop(channels);
150
151 {
152 let mut stats = self.stats.write().unwrap();
153 stats.events_published += 1;
154 }
155
156 Ok(receivers)
157 }
158
159 pub fn publish_change(&self, channel_id: &ChannelId, change: ChangeEvent) -> Result<usize, EngineError> {
161 let event = change.to_event();
162 self.publish(channel_id, event)
163 }
164
165 pub fn publish_to_many(
167 &self,
168 channel_ids: &[ChannelId],
169 event: Event,
170 ) -> HashMap<ChannelId, Result<usize, EngineError>> {
171 let mut results = HashMap::new();
172
173 for id in channel_ids {
174 results.insert(id.clone(), self.publish(id, event.clone()));
175 }
176
177 results
178 }
179
180 pub fn subscribe(
186 &self,
187 channel_id: &ChannelId,
188 subscriber_id: impl Into<SubscriberId>,
189 ) -> Result<ChannelReceiver, EngineError> {
190 let subscriber_id = subscriber_id.into();
191 let channels = self.channels.read().unwrap();
192 let channel = channels
193 .get(channel_id)
194 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
195
196 let receiver = channel
197 .subscribe(subscriber_id.clone())
198 .map_err(EngineError::Channel)?;
199
200 drop(channels);
201
202 self.ensure_subscriber(&subscriber_id, channel_id);
203
204 {
205 let mut stats = self.stats.write().unwrap();
206 stats.active_subscriptions += 1;
207 }
208
209 Ok(receiver)
210 }
211
212 pub fn subscribe_with_filter(
214 &self,
215 channel_id: &ChannelId,
216 subscriber_id: impl Into<SubscriberId>,
217 filter: EventFilter,
218 ) -> Result<ChannelReceiver, EngineError> {
219 let subscriber_id = subscriber_id.into();
220 let channels = self.channels.read().unwrap();
221 let channel = channels
222 .get(channel_id)
223 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
224
225 let receiver = channel
226 .subscribe_with_filter(subscriber_id.clone(), filter)
227 .map_err(EngineError::Channel)?;
228
229 drop(channels);
230
231 self.ensure_subscriber(&subscriber_id, channel_id);
232
233 {
234 let mut stats = self.stats.write().unwrap();
235 stats.active_subscriptions += 1;
236 }
237
238 Ok(receiver)
239 }
240
241 pub fn unsubscribe(&self, channel_id: &ChannelId, subscriber_id: &SubscriberId) {
243 let channels = self.channels.read().unwrap();
244 if let Some(channel) = channels.get(channel_id) {
245 channel.unsubscribe(subscriber_id);
246 }
247
248 let mut stats = self.stats.write().unwrap();
249 stats.active_subscriptions = stats.active_subscriptions.saturating_sub(1);
250 }
251
252 fn ensure_subscriber(&self, subscriber_id: &SubscriberId, channel_id: &ChannelId) {
253 let mut subscribers = self.subscribers.write().unwrap();
254
255 let subscriber = subscribers
256 .entry(subscriber_id.clone())
257 .or_insert_with(|| Subscriber::new(subscriber_id.clone()));
258
259 let mut subscription = Subscription::new(subscriber_id.clone());
260 subscription.add_channel(channel_id.clone());
261 subscriber.add_subscription(subscription);
262 }
263
264 pub fn get_subscriber(&self, id: &SubscriberId) -> Option<Subscriber> {
270 let subscribers = self.subscribers.read().unwrap();
271 subscribers.get(id).cloned()
272 }
273
274 pub fn list_subscribers(&self) -> Vec<SubscriberId> {
276 let subscribers = self.subscribers.read().unwrap();
277 subscribers.keys().cloned().collect()
278 }
279
280 pub fn remove_subscriber(&self, id: &SubscriberId) {
282 let mut subscribers = self.subscribers.write().unwrap();
283 subscribers.remove(id);
284 }
285
286 pub fn get_history(&self, channel_id: &ChannelId, count: usize) -> Result<Vec<Event>, EngineError> {
292 let channels = self.channels.read().unwrap();
293 let channel = channels
294 .get(channel_id)
295 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
296
297 Ok(channel.get_history(count))
298 }
299
300 pub fn get_history_after(
302 &self,
303 channel_id: &ChannelId,
304 timestamp: u64,
305 ) -> Result<Vec<Event>, EngineError> {
306 let channels = self.channels.read().unwrap();
307 let channel = channels
308 .get(channel_id)
309 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
310
311 Ok(channel.get_history_after(timestamp))
312 }
313
314 pub fn stats(&self) -> EngineStats {
320 let stats = self.stats.read().unwrap();
321 stats.clone()
322 }
323
324 pub fn reset_stats(&self) {
326 let mut stats = self.stats.write().unwrap();
327 *stats = EngineStats::default();
328 }
329
330 pub fn channel_stats(&self, id: &ChannelId) -> Option<crate::channel::ChannelStats> {
332 let channels = self.channels.read().unwrap();
333 channels.get(id).map(|c| c.stats())
334 }
335}
336
337impl Default for StreamingEngine {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343#[derive(Debug, Clone, Default)]
349pub struct EngineStats {
350 pub events_published: u64,
351 pub active_subscriptions: usize,
352 pub channels_created: usize,
353}
354
355#[derive(Debug, Clone)]
361pub enum EngineError {
362 ChannelExists(ChannelId),
363 ChannelNotFound(ChannelId),
364 TooManyChannels,
365 TooManySubscribers,
366 Channel(ChannelError),
367}
368
369impl std::fmt::Display for EngineError {
370 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371 match self {
372 Self::ChannelExists(id) => write!(f, "Channel already exists: {}", id),
373 Self::ChannelNotFound(id) => write!(f, "Channel not found: {}", id),
374 Self::TooManyChannels => write!(f, "Maximum channels reached"),
375 Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
376 Self::Channel(err) => write!(f, "Channel error: {}", err),
377 }
378 }
379}
380
381impl std::error::Error for EngineError {}
382
383#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::event::{EventData, EventType};
391
392 #[test]
393 fn test_engine_creation() {
394 let engine = StreamingEngine::new();
395 assert!(engine.list_channels().is_empty());
396 }
397
398 #[test]
399 fn test_channel_management() {
400 let engine = StreamingEngine::new();
401
402 engine.create_channel("events").unwrap();
403 assert!(engine.channel_exists(&ChannelId::new("events")));
404
405 let channels = engine.list_channels();
406 assert_eq!(channels.len(), 1);
407
408 engine.delete_channel(&ChannelId::new("events")).unwrap();
409 assert!(!engine.channel_exists(&ChannelId::new("events")));
410 }
411
412 #[tokio::test]
413 async fn test_publish_subscribe() {
414 let engine = StreamingEngine::new();
415 engine.create_channel("test").unwrap();
416
417 let channel_id = ChannelId::new("test");
418 let mut receiver = engine.subscribe(&channel_id, "sub1").unwrap();
419
420 let event = Event::new(EventType::Created, "source", EventData::String("hello".to_string()));
421 engine.publish(&channel_id, event).unwrap();
422
423 let received = receiver.recv().await.unwrap();
424 assert_eq!(received.source, "source");
425 }
426
427 #[test]
428 fn test_duplicate_channel() {
429 let engine = StreamingEngine::new();
430
431 engine.create_channel("test").unwrap();
432 let result = engine.create_channel("test");
433
434 assert!(matches!(result, Err(EngineError::ChannelExists(_))));
435 }
436
437 #[test]
438 fn test_stats() {
439 let engine = StreamingEngine::new();
440 engine.create_channel("test").unwrap();
441
442 let channel_id = ChannelId::new("test");
443 engine.subscribe(&channel_id, "sub1").unwrap();
444
445 let event = Event::new(EventType::Created, "source", EventData::Null);
446 engine.publish(&channel_id, event).unwrap();
447
448 let stats = engine.stats();
449 assert_eq!(stats.events_published, 1);
450 assert_eq!(stats.active_subscriptions, 1);
451 }
452
453 #[test]
454 fn test_history() {
455 let config = EngineConfig {
456 default_channel_config: ChannelConfig {
457 persistent: true,
458 retention_count: 100,
459 ..Default::default()
460 },
461 ..Default::default()
462 };
463
464 let engine = StreamingEngine::with_config(config);
465 engine.create_channel("history").unwrap();
466
467 let channel_id = ChannelId::new("history");
468
469 for i in 0..5 {
470 let event = Event::new(EventType::Created, "test", EventData::Int(i));
471 engine.publish(&channel_id, event).unwrap();
472 }
473
474 let history = engine.get_history(&channel_id, 10).unwrap();
475 assert_eq!(history.len(), 5);
476 }
477}