1use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22pub trait Timestamped {
24 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32 Source,
33 Query,
34 Reaction,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161 Starting,
162 Running,
163 Stopping,
164 Stopped,
165 Reconfiguring,
166 Error,
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceChangeEvent {
171 pub source_id: String,
172 pub change: SourceChange,
173 pub timestamp: chrono::DateTime<chrono::Utc>,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub enum SourceControl {
179 Subscription {
181 query_id: String,
182 query_node_id: String,
183 node_labels: Vec<String>,
184 rel_labels: Vec<String>,
185 operation: ControlOperation,
186 },
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
191pub enum ControlOperation {
192 Insert,
193 Update,
194 Delete,
195}
196
197#[derive(Debug, Clone)]
199pub enum SourceEvent {
200 Change(SourceChange),
202 Control(SourceControl),
204 BootstrapStart { query_id: String },
206 BootstrapEnd { query_id: String },
208}
209
210#[derive(Debug, Clone)]
212pub struct SourceEventWrapper {
213 pub source_id: String,
214 pub event: SourceEvent,
215 pub timestamp: chrono::DateTime<chrono::Utc>,
216 pub profiling: Option<ProfilingMetadata>,
218}
219
220impl SourceEventWrapper {
221 pub fn new(
223 source_id: String,
224 event: SourceEvent,
225 timestamp: chrono::DateTime<chrono::Utc>,
226 ) -> Self {
227 Self {
228 source_id,
229 event,
230 timestamp,
231 profiling: None,
232 }
233 }
234
235 pub fn with_profiling(
237 source_id: String,
238 event: SourceEvent,
239 timestamp: chrono::DateTime<chrono::Utc>,
240 profiling: ProfilingMetadata,
241 ) -> Self {
242 Self {
243 source_id,
244 event,
245 timestamp,
246 profiling: Some(profiling),
247 }
248 }
249
250 pub fn into_parts(
253 self,
254 ) -> (
255 String,
256 SourceEvent,
257 chrono::DateTime<chrono::Utc>,
258 Option<ProfilingMetadata>,
259 ) {
260 (self.source_id, self.event, self.timestamp, self.profiling)
261 }
262
263 pub fn try_unwrap_arc(
270 arc_self: Arc<Self>,
271 ) -> Result<
272 (
273 String,
274 SourceEvent,
275 chrono::DateTime<chrono::Utc>,
276 Option<ProfilingMetadata>,
277 ),
278 Arc<Self>,
279 > {
280 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
281 }
282}
283
284impl Timestamped for SourceEventWrapper {
286 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
287 self.timestamp
288 }
289}
290
291pub type ArcSourceEvent = Arc<SourceEventWrapper>;
293
294#[derive(Debug, Clone)]
296pub struct BootstrapEvent {
297 pub source_id: String,
298 pub change: SourceChange,
299 pub timestamp: chrono::DateTime<chrono::Utc>,
300 pub sequence: u64,
301}
302
303#[derive(Debug, Clone)]
305pub struct BootstrapComplete {
306 pub source_id: String,
307 pub total_events: u64,
308}
309
310#[derive(Debug, Clone)]
312pub struct SubscriptionRequest {
313 pub query_id: String,
314 pub source_id: String,
315 pub enable_bootstrap: bool,
316 pub node_labels: Vec<String>,
317 pub relation_labels: Vec<String>,
318}
319
320pub struct SubscriptionResponse {
322 pub query_id: String,
323 pub source_id: String,
324 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
325 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
326}
327
328pub struct QuerySubscriptionResponse {
330 pub query_id: String,
331 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
332}
333
334#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
336#[serde(tag = "type")]
337pub enum ResultDiff {
338 #[serde(rename = "ADD")]
339 Add { data: serde_json::Value },
340 #[serde(rename = "DELETE")]
341 Delete { data: serde_json::Value },
342 #[serde(rename = "UPDATE")]
343 Update {
344 data: serde_json::Value,
345 before: serde_json::Value,
346 after: serde_json::Value,
347 #[serde(skip_serializing_if = "Option::is_none")]
348 grouping_keys: Option<Vec<String>>,
349 },
350 #[serde(rename = "aggregation")]
351 Aggregation {
352 before: Option<serde_json::Value>,
353 after: serde_json::Value,
354 },
355 #[serde(rename = "noop")]
356 Noop,
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct QueryResult {
361 pub query_id: String,
362 pub timestamp: chrono::DateTime<chrono::Utc>,
363 pub results: Vec<ResultDiff>,
364 pub metadata: HashMap<String, serde_json::Value>,
365 #[serde(skip_serializing_if = "Option::is_none")]
367 pub profiling: Option<ProfilingMetadata>,
368}
369
370impl QueryResult {
371 pub fn new(
373 query_id: String,
374 timestamp: chrono::DateTime<chrono::Utc>,
375 results: Vec<ResultDiff>,
376 metadata: HashMap<String, serde_json::Value>,
377 ) -> Self {
378 Self {
379 query_id,
380 timestamp,
381 results,
382 metadata,
383 profiling: None,
384 }
385 }
386
387 pub fn with_profiling(
389 query_id: String,
390 timestamp: chrono::DateTime<chrono::Utc>,
391 results: Vec<ResultDiff>,
392 metadata: HashMap<String, serde_json::Value>,
393 profiling: ProfilingMetadata,
394 ) -> Self {
395 Self {
396 query_id,
397 timestamp,
398 results,
399 metadata,
400 profiling: Some(profiling),
401 }
402 }
403}
404
405impl Timestamped for QueryResult {
407 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
408 self.timestamp
409 }
410}
411
412pub type ArcQueryResult = Arc<QueryResult>;
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
416pub struct ComponentEvent {
417 pub component_id: String,
418 pub component_type: ComponentType,
419 pub status: ComponentStatus,
420 pub timestamp: chrono::DateTime<chrono::Utc>,
421 pub message: Option<String>,
422}
423
424#[derive(Debug, Clone, Serialize, Deserialize)]
425pub enum ControlMessage {
426 Start(String),
427 Stop(String),
428 Status(String),
429 Shutdown,
430}
431
432pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
433pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
434pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
435pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
436
437pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
439pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
440
441pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
443pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
444
445pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
447pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
448pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
449pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
450
451#[derive(Debug, Clone, Serialize, Deserialize)]
453pub enum ControlSignal {
454 Running { query_id: String },
456 Stopped { query_id: String },
458 Deleted { query_id: String },
460}
461
462#[derive(Debug, Clone)]
464pub struct ControlSignalWrapper {
465 pub signal: ControlSignal,
466 pub timestamp: chrono::DateTime<chrono::Utc>,
467 pub sequence_number: Option<u64>,
468}
469
470impl ControlSignalWrapper {
471 pub fn new(signal: ControlSignal) -> Self {
472 Self {
473 signal,
474 timestamp: chrono::Utc::now(),
475 sequence_number: None,
476 }
477 }
478
479 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
480 Self {
481 signal,
482 timestamp: chrono::Utc::now(),
483 sequence_number: Some(sequence_number),
484 }
485 }
486}
487
488pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
489pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
490
491pub struct EventChannels {
492 pub component_event_tx: ComponentEventSender,
493 pub _control_tx: ControlMessageSender,
494 pub control_signal_tx: ControlSignalSender,
495}
496
497pub struct EventReceivers {
498 pub component_event_rx: ComponentEventReceiver,
499 pub _control_rx: ControlMessageReceiver,
500 pub control_signal_rx: ControlSignalReceiver,
501}
502
503impl EventChannels {
504 pub fn new() -> (Self, EventReceivers) {
505 let (component_event_tx, component_event_rx) = mpsc::channel(1000);
506 let (control_tx, control_rx) = mpsc::channel(100);
507 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
508
509 let channels = Self {
510 component_event_tx,
511 _control_tx: control_tx,
512 control_signal_tx,
513 };
514
515 let receivers = EventReceivers {
516 component_event_rx,
517 _control_rx: control_rx,
518 control_signal_rx,
519 };
520
521 (channels, receivers)
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
529
530 fn create_test_source_change() -> SourceChange {
531 let element = Element::Node {
532 metadata: ElementMetadata {
533 reference: ElementReference::new("TestSource", "test-node-1"),
534 labels: vec!["TestLabel".into()].into(),
535 effective_from: 1000,
536 },
537 properties: Default::default(),
538 };
539 SourceChange::Insert { element }
540 }
541
542 #[test]
543 fn test_source_event_wrapper_into_parts() {
544 let change = create_test_source_change();
545 let wrapper = SourceEventWrapper::new(
546 "test-source".to_string(),
547 SourceEvent::Change(change),
548 chrono::Utc::now(),
549 );
550
551 let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
552
553 assert_eq!(source_id, "test-source");
554 assert!(matches!(event, SourceEvent::Change(_)));
555 assert!(profiling.is_none());
556 }
557
558 #[test]
559 fn test_try_unwrap_arc_sole_owner() {
560 let change = create_test_source_change();
561 let wrapper = SourceEventWrapper::new(
562 "test-source".to_string(),
563 SourceEvent::Change(change),
564 chrono::Utc::now(),
565 );
566 let arc = Arc::new(wrapper);
567
568 let result = SourceEventWrapper::try_unwrap_arc(arc);
570 assert!(result.is_ok());
571
572 let (source_id, event, _timestamp, _profiling) = result.unwrap();
573 assert_eq!(source_id, "test-source");
574 assert!(matches!(event, SourceEvent::Change(_)));
575 }
576
577 #[test]
578 fn test_try_unwrap_arc_shared() {
579 let change = create_test_source_change();
580 let wrapper = SourceEventWrapper::new(
581 "test-source".to_string(),
582 SourceEvent::Change(change),
583 chrono::Utc::now(),
584 );
585 let arc = Arc::new(wrapper);
586 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
590 assert!(result.is_err());
591
592 let returned_arc = result.unwrap_err();
594 assert_eq!(returned_arc.source_id, "test-source");
595 }
596
597 #[test]
598 fn test_zero_copy_extraction_path() {
599 let change = create_test_source_change();
601 let wrapper = SourceEventWrapper::new(
602 "test-source".to_string(),
603 SourceEvent::Change(change),
604 chrono::Utc::now(),
605 );
606 let arc = Arc::new(wrapper);
607
608 let (source_id, event, _timestamp, _profiling) =
610 match SourceEventWrapper::try_unwrap_arc(arc) {
611 Ok(parts) => parts,
612 Err(arc) => {
613 (
615 arc.source_id.clone(),
616 arc.event.clone(),
617 arc.timestamp,
618 arc.profiling.clone(),
619 )
620 }
621 };
622
623 let source_change = match event {
625 SourceEvent::Change(change) => Some(change),
626 _ => None,
627 };
628
629 assert_eq!(source_id, "test-source");
630 assert!(source_change.is_some());
631 }
632}