1use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22pub trait Timestamped {
24 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32 Source,
33 Query,
34 Reaction,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161 Starting,
162 Running,
163 Stopping,
164 Stopped,
165 Reconfiguring,
166 Error,
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceChangeEvent {
171 pub source_id: String,
172 pub change: SourceChange,
173 pub timestamp: chrono::DateTime<chrono::Utc>,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub enum SourceControl {
179 Subscription {
181 query_id: String,
182 query_node_id: String,
183 node_labels: Vec<String>,
184 rel_labels: Vec<String>,
185 operation: ControlOperation,
186 },
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
191pub enum ControlOperation {
192 Insert,
193 Update,
194 Delete,
195}
196
197#[derive(Debug, Clone)]
199pub enum SourceEvent {
200 Change(SourceChange),
202 Control(SourceControl),
204}
205
206#[derive(Debug, Clone)]
208pub struct SourceEventWrapper {
209 pub source_id: String,
210 pub event: SourceEvent,
211 pub timestamp: chrono::DateTime<chrono::Utc>,
212 pub profiling: Option<ProfilingMetadata>,
214}
215
216impl SourceEventWrapper {
217 pub fn new(
219 source_id: String,
220 event: SourceEvent,
221 timestamp: chrono::DateTime<chrono::Utc>,
222 ) -> Self {
223 Self {
224 source_id,
225 event,
226 timestamp,
227 profiling: None,
228 }
229 }
230
231 pub fn with_profiling(
233 source_id: String,
234 event: SourceEvent,
235 timestamp: chrono::DateTime<chrono::Utc>,
236 profiling: ProfilingMetadata,
237 ) -> Self {
238 Self {
239 source_id,
240 event,
241 timestamp,
242 profiling: Some(profiling),
243 }
244 }
245
246 pub fn into_parts(
249 self,
250 ) -> (
251 String,
252 SourceEvent,
253 chrono::DateTime<chrono::Utc>,
254 Option<ProfilingMetadata>,
255 ) {
256 (self.source_id, self.event, self.timestamp, self.profiling)
257 }
258
259 pub fn try_unwrap_arc(
266 arc_self: Arc<Self>,
267 ) -> Result<
268 (
269 String,
270 SourceEvent,
271 chrono::DateTime<chrono::Utc>,
272 Option<ProfilingMetadata>,
273 ),
274 Arc<Self>,
275 > {
276 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
277 }
278}
279
280impl Timestamped for SourceEventWrapper {
282 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
283 self.timestamp
284 }
285}
286
287pub type ArcSourceEvent = Arc<SourceEventWrapper>;
289
290#[derive(Debug, Clone)]
292pub struct BootstrapEvent {
293 pub source_id: String,
294 pub change: SourceChange,
295 pub timestamp: chrono::DateTime<chrono::Utc>,
296 pub sequence: u64,
297}
298
299#[derive(Debug, Clone)]
301pub struct BootstrapComplete {
302 pub source_id: String,
303 pub total_events: u64,
304}
305
306#[derive(Debug, Clone)]
308pub struct SubscriptionRequest {
309 pub query_id: String,
310 pub source_id: String,
311 pub enable_bootstrap: bool,
312 pub node_labels: Vec<String>,
313 pub relation_labels: Vec<String>,
314}
315
316pub struct SubscriptionResponse {
318 pub query_id: String,
319 pub source_id: String,
320 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
321 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
322}
323
324pub struct QuerySubscriptionResponse {
326 pub query_id: String,
327 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
332#[serde(tag = "type")]
333pub enum ResultDiff {
334 #[serde(rename = "ADD")]
335 Add { data: serde_json::Value },
336 #[serde(rename = "DELETE")]
337 Delete { data: serde_json::Value },
338 #[serde(rename = "UPDATE")]
339 Update {
340 data: serde_json::Value,
341 before: serde_json::Value,
342 after: serde_json::Value,
343 #[serde(skip_serializing_if = "Option::is_none")]
344 grouping_keys: Option<Vec<String>>,
345 },
346 #[serde(rename = "aggregation")]
347 Aggregation {
348 before: Option<serde_json::Value>,
349 after: serde_json::Value,
350 },
351 #[serde(rename = "noop")]
352 Noop,
353}
354
355#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct QueryResult {
357 pub query_id: String,
358 pub timestamp: chrono::DateTime<chrono::Utc>,
359 pub results: Vec<ResultDiff>,
360 pub metadata: HashMap<String, serde_json::Value>,
361 #[serde(skip_serializing_if = "Option::is_none")]
363 pub profiling: Option<ProfilingMetadata>,
364}
365
366impl QueryResult {
367 pub fn new(
369 query_id: String,
370 timestamp: chrono::DateTime<chrono::Utc>,
371 results: Vec<ResultDiff>,
372 metadata: HashMap<String, serde_json::Value>,
373 ) -> Self {
374 Self {
375 query_id,
376 timestamp,
377 results,
378 metadata,
379 profiling: None,
380 }
381 }
382
383 pub fn with_profiling(
385 query_id: String,
386 timestamp: chrono::DateTime<chrono::Utc>,
387 results: Vec<ResultDiff>,
388 metadata: HashMap<String, serde_json::Value>,
389 profiling: ProfilingMetadata,
390 ) -> Self {
391 Self {
392 query_id,
393 timestamp,
394 results,
395 metadata,
396 profiling: Some(profiling),
397 }
398 }
399}
400
401impl Timestamped for QueryResult {
403 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
404 self.timestamp
405 }
406}
407
408pub type ArcQueryResult = Arc<QueryResult>;
410
411#[derive(Debug, Clone, Serialize, Deserialize)]
412pub struct ComponentEvent {
413 pub component_id: String,
414 pub component_type: ComponentType,
415 pub status: ComponentStatus,
416 pub timestamp: chrono::DateTime<chrono::Utc>,
417 pub message: Option<String>,
418}
419
420#[derive(Debug, Clone, Serialize, Deserialize)]
421pub enum ControlMessage {
422 Start(String),
423 Stop(String),
424 Status(String),
425 Shutdown,
426}
427
428pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
429pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
430pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
431pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
432
433pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
435pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
436
437pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
439pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
440
441pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
443pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
444pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
445pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
449pub enum ControlSignal {
450 Running { query_id: String },
452 Stopped { query_id: String },
454 Deleted { query_id: String },
456}
457
458#[derive(Debug, Clone)]
460pub struct ControlSignalWrapper {
461 pub signal: ControlSignal,
462 pub timestamp: chrono::DateTime<chrono::Utc>,
463 pub sequence_number: Option<u64>,
464}
465
466impl ControlSignalWrapper {
467 pub fn new(signal: ControlSignal) -> Self {
468 Self {
469 signal,
470 timestamp: chrono::Utc::now(),
471 sequence_number: None,
472 }
473 }
474
475 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
476 Self {
477 signal,
478 timestamp: chrono::Utc::now(),
479 sequence_number: Some(sequence_number),
480 }
481 }
482}
483
484pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
485pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
486
487pub struct EventChannels {
488 pub component_event_tx: ComponentEventSender,
489 pub _control_tx: ControlMessageSender,
490 pub control_signal_tx: ControlSignalSender,
491}
492
493pub struct EventReceivers {
494 pub component_event_rx: ComponentEventReceiver,
495 pub _control_rx: ControlMessageReceiver,
496 pub control_signal_rx: ControlSignalReceiver,
497}
498
499impl EventChannels {
500 pub fn new() -> (Self, EventReceivers) {
501 let (component_event_tx, component_event_rx) = mpsc::channel(1000);
502 let (control_tx, control_rx) = mpsc::channel(100);
503 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
504
505 let channels = Self {
506 component_event_tx,
507 _control_tx: control_tx,
508 control_signal_tx,
509 };
510
511 let receivers = EventReceivers {
512 component_event_rx,
513 _control_rx: control_rx,
514 control_signal_rx,
515 };
516
517 (channels, receivers)
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
525
526 fn create_test_source_change() -> SourceChange {
527 let element = Element::Node {
528 metadata: ElementMetadata {
529 reference: ElementReference::new("TestSource", "test-node-1"),
530 labels: vec!["TestLabel".into()].into(),
531 effective_from: 1000,
532 },
533 properties: Default::default(),
534 };
535 SourceChange::Insert { element }
536 }
537
538 #[test]
539 fn test_source_event_wrapper_into_parts() {
540 let change = create_test_source_change();
541 let wrapper = SourceEventWrapper::new(
542 "test-source".to_string(),
543 SourceEvent::Change(change),
544 chrono::Utc::now(),
545 );
546
547 let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
548
549 assert_eq!(source_id, "test-source");
550 assert!(matches!(event, SourceEvent::Change(_)));
551 assert!(profiling.is_none());
552 }
553
554 #[test]
555 fn test_try_unwrap_arc_sole_owner() {
556 let change = create_test_source_change();
557 let wrapper = SourceEventWrapper::new(
558 "test-source".to_string(),
559 SourceEvent::Change(change),
560 chrono::Utc::now(),
561 );
562 let arc = Arc::new(wrapper);
563
564 let result = SourceEventWrapper::try_unwrap_arc(arc);
566 assert!(result.is_ok());
567
568 let (source_id, event, _timestamp, _profiling) = result.unwrap();
569 assert_eq!(source_id, "test-source");
570 assert!(matches!(event, SourceEvent::Change(_)));
571 }
572
573 #[test]
574 fn test_try_unwrap_arc_shared() {
575 let change = create_test_source_change();
576 let wrapper = SourceEventWrapper::new(
577 "test-source".to_string(),
578 SourceEvent::Change(change),
579 chrono::Utc::now(),
580 );
581 let arc = Arc::new(wrapper);
582 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
586 assert!(result.is_err());
587
588 let returned_arc = result.unwrap_err();
590 assert_eq!(returned_arc.source_id, "test-source");
591 }
592
593 #[test]
594 fn test_zero_copy_extraction_path() {
595 let change = create_test_source_change();
597 let wrapper = SourceEventWrapper::new(
598 "test-source".to_string(),
599 SourceEvent::Change(change),
600 chrono::Utc::now(),
601 );
602 let arc = Arc::new(wrapper);
603
604 let (source_id, event, _timestamp, _profiling) =
606 match SourceEventWrapper::try_unwrap_arc(arc) {
607 Ok(parts) => parts,
608 Err(arc) => {
609 (
611 arc.source_id.clone(),
612 arc.event.clone(),
613 arc.timestamp,
614 arc.profiling.clone(),
615 )
616 }
617 };
618
619 let source_change = match event {
621 SourceEvent::Change(change) => Some(change),
622 _ => None,
623 };
624
625 assert_eq!(source_id, "test-source");
626 assert!(source_change.is_some());
627 }
628}