1use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22pub trait Timestamped {
24 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
31pub enum ComponentType {
32 Source,
33 Query,
34 Reaction,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161 Starting,
162 Running,
163 Stopping,
164 Stopped,
165 Error,
166}
167
168#[derive(Debug, Clone)]
169pub struct SourceChangeEvent {
170 pub source_id: String,
171 pub change: SourceChange,
172 pub timestamp: chrono::DateTime<chrono::Utc>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
177pub enum SourceControl {
178 Subscription {
180 query_id: String,
181 query_node_id: String,
182 node_labels: Vec<String>,
183 rel_labels: Vec<String>,
184 operation: ControlOperation,
185 },
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
190pub enum ControlOperation {
191 Insert,
192 Update,
193 Delete,
194}
195
196#[derive(Debug, Clone)]
198pub enum SourceEvent {
199 Change(SourceChange),
201 Control(SourceControl),
203 BootstrapStart { query_id: String },
205 BootstrapEnd { query_id: String },
207}
208
209#[derive(Debug, Clone)]
211pub struct SourceEventWrapper {
212 pub source_id: String,
213 pub event: SourceEvent,
214 pub timestamp: chrono::DateTime<chrono::Utc>,
215 pub profiling: Option<ProfilingMetadata>,
217}
218
219impl SourceEventWrapper {
220 pub fn new(
222 source_id: String,
223 event: SourceEvent,
224 timestamp: chrono::DateTime<chrono::Utc>,
225 ) -> Self {
226 Self {
227 source_id,
228 event,
229 timestamp,
230 profiling: None,
231 }
232 }
233
234 pub fn with_profiling(
236 source_id: String,
237 event: SourceEvent,
238 timestamp: chrono::DateTime<chrono::Utc>,
239 profiling: ProfilingMetadata,
240 ) -> Self {
241 Self {
242 source_id,
243 event,
244 timestamp,
245 profiling: Some(profiling),
246 }
247 }
248
249 pub fn into_parts(
252 self,
253 ) -> (
254 String,
255 SourceEvent,
256 chrono::DateTime<chrono::Utc>,
257 Option<ProfilingMetadata>,
258 ) {
259 (self.source_id, self.event, self.timestamp, self.profiling)
260 }
261
262 pub fn try_unwrap_arc(
269 arc_self: Arc<Self>,
270 ) -> Result<
271 (
272 String,
273 SourceEvent,
274 chrono::DateTime<chrono::Utc>,
275 Option<ProfilingMetadata>,
276 ),
277 Arc<Self>,
278 > {
279 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
280 }
281}
282
283impl Timestamped for SourceEventWrapper {
285 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
286 self.timestamp
287 }
288}
289
290pub type ArcSourceEvent = Arc<SourceEventWrapper>;
292
293#[derive(Debug, Clone)]
295pub struct BootstrapEvent {
296 pub source_id: String,
297 pub change: SourceChange,
298 pub timestamp: chrono::DateTime<chrono::Utc>,
299 pub sequence: u64,
300}
301
302#[derive(Debug, Clone)]
304pub struct BootstrapComplete {
305 pub source_id: String,
306 pub total_events: u64,
307}
308
309#[derive(Debug, Clone)]
311pub struct SubscriptionRequest {
312 pub query_id: String,
313 pub source_id: String,
314 pub enable_bootstrap: bool,
315 pub node_labels: Vec<String>,
316 pub relation_labels: Vec<String>,
317}
318
319pub struct SubscriptionResponse {
321 pub query_id: String,
322 pub source_id: String,
323 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
324 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
325}
326
327pub struct QuerySubscriptionResponse {
329 pub query_id: String,
330 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
335#[serde(tag = "type")]
336pub enum ResultDiff {
337 #[serde(rename = "ADD")]
338 Add { data: serde_json::Value },
339 #[serde(rename = "DELETE")]
340 Delete { data: serde_json::Value },
341 #[serde(rename = "UPDATE")]
342 Update {
343 data: serde_json::Value,
344 before: serde_json::Value,
345 after: serde_json::Value,
346 #[serde(skip_serializing_if = "Option::is_none")]
347 grouping_keys: Option<Vec<String>>,
348 },
349 #[serde(rename = "aggregation")]
350 Aggregation {
351 before: Option<serde_json::Value>,
352 after: serde_json::Value,
353 },
354 #[serde(rename = "noop")]
355 Noop,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct QueryResult {
360 pub query_id: String,
361 pub timestamp: chrono::DateTime<chrono::Utc>,
362 pub results: Vec<ResultDiff>,
363 pub metadata: HashMap<String, serde_json::Value>,
364 #[serde(skip_serializing_if = "Option::is_none")]
366 pub profiling: Option<ProfilingMetadata>,
367}
368
369impl QueryResult {
370 pub fn new(
372 query_id: String,
373 timestamp: chrono::DateTime<chrono::Utc>,
374 results: Vec<ResultDiff>,
375 metadata: HashMap<String, serde_json::Value>,
376 ) -> Self {
377 Self {
378 query_id,
379 timestamp,
380 results,
381 metadata,
382 profiling: None,
383 }
384 }
385
386 pub fn with_profiling(
388 query_id: String,
389 timestamp: chrono::DateTime<chrono::Utc>,
390 results: Vec<ResultDiff>,
391 metadata: HashMap<String, serde_json::Value>,
392 profiling: ProfilingMetadata,
393 ) -> Self {
394 Self {
395 query_id,
396 timestamp,
397 results,
398 metadata,
399 profiling: Some(profiling),
400 }
401 }
402}
403
404impl Timestamped for QueryResult {
406 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
407 self.timestamp
408 }
409}
410
411pub type ArcQueryResult = Arc<QueryResult>;
413
414#[derive(Debug, Clone, Serialize, Deserialize)]
415pub struct ComponentEvent {
416 pub component_id: String,
417 pub component_type: ComponentType,
418 pub status: ComponentStatus,
419 pub timestamp: chrono::DateTime<chrono::Utc>,
420 pub message: Option<String>,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub enum ControlMessage {
425 Start(String),
426 Stop(String),
427 Status(String),
428 Shutdown,
429}
430
431pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
432pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
433pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
434pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
435
436pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
438pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
439
440pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
442pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
443
444pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
446pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
447pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
448pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
452pub enum ControlSignal {
453 Running { query_id: String },
455 Stopped { query_id: String },
457 Deleted { query_id: String },
459}
460
461#[derive(Debug, Clone)]
463pub struct ControlSignalWrapper {
464 pub signal: ControlSignal,
465 pub timestamp: chrono::DateTime<chrono::Utc>,
466 pub sequence_number: Option<u64>,
467}
468
469impl ControlSignalWrapper {
470 pub fn new(signal: ControlSignal) -> Self {
471 Self {
472 signal,
473 timestamp: chrono::Utc::now(),
474 sequence_number: None,
475 }
476 }
477
478 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
479 Self {
480 signal,
481 timestamp: chrono::Utc::now(),
482 sequence_number: Some(sequence_number),
483 }
484 }
485}
486
487pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
488pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
489
490pub struct EventChannels {
491 pub component_event_tx: ComponentEventSender,
492 pub _control_tx: ControlMessageSender,
493 pub control_signal_tx: ControlSignalSender,
494}
495
496pub struct EventReceivers {
497 pub component_event_rx: ComponentEventReceiver,
498 pub _control_rx: ControlMessageReceiver,
499 pub control_signal_rx: ControlSignalReceiver,
500}
501
502impl EventChannels {
503 pub fn new() -> (Self, EventReceivers) {
504 let (component_event_tx, component_event_rx) = mpsc::channel(1000);
505 let (control_tx, control_rx) = mpsc::channel(100);
506 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
507
508 let channels = Self {
509 component_event_tx,
510 _control_tx: control_tx,
511 control_signal_tx,
512 };
513
514 let receivers = EventReceivers {
515 component_event_rx,
516 _control_rx: control_rx,
517 control_signal_rx,
518 };
519
520 (channels, receivers)
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
528
529 fn create_test_source_change() -> SourceChange {
530 let element = Element::Node {
531 metadata: ElementMetadata {
532 reference: ElementReference::new("TestSource", "test-node-1"),
533 labels: vec!["TestLabel".into()].into(),
534 effective_from: 1000,
535 },
536 properties: Default::default(),
537 };
538 SourceChange::Insert { element }
539 }
540
541 #[test]
542 fn test_source_event_wrapper_into_parts() {
543 let change = create_test_source_change();
544 let wrapper = SourceEventWrapper::new(
545 "test-source".to_string(),
546 SourceEvent::Change(change),
547 chrono::Utc::now(),
548 );
549
550 let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
551
552 assert_eq!(source_id, "test-source");
553 assert!(matches!(event, SourceEvent::Change(_)));
554 assert!(profiling.is_none());
555 }
556
557 #[test]
558 fn test_try_unwrap_arc_sole_owner() {
559 let change = create_test_source_change();
560 let wrapper = SourceEventWrapper::new(
561 "test-source".to_string(),
562 SourceEvent::Change(change),
563 chrono::Utc::now(),
564 );
565 let arc = Arc::new(wrapper);
566
567 let result = SourceEventWrapper::try_unwrap_arc(arc);
569 assert!(result.is_ok());
570
571 let (source_id, event, _timestamp, _profiling) = result.unwrap();
572 assert_eq!(source_id, "test-source");
573 assert!(matches!(event, SourceEvent::Change(_)));
574 }
575
576 #[test]
577 fn test_try_unwrap_arc_shared() {
578 let change = create_test_source_change();
579 let wrapper = SourceEventWrapper::new(
580 "test-source".to_string(),
581 SourceEvent::Change(change),
582 chrono::Utc::now(),
583 );
584 let arc = Arc::new(wrapper);
585 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
589 assert!(result.is_err());
590
591 let returned_arc = result.unwrap_err();
593 assert_eq!(returned_arc.source_id, "test-source");
594 }
595
596 #[test]
597 fn test_zero_copy_extraction_path() {
598 let change = create_test_source_change();
600 let wrapper = SourceEventWrapper::new(
601 "test-source".to_string(),
602 SourceEvent::Change(change),
603 chrono::Utc::now(),
604 );
605 let arc = Arc::new(wrapper);
606
607 let (source_id, event, _timestamp, _profiling) =
609 match SourceEventWrapper::try_unwrap_arc(arc) {
610 Ok(parts) => parts,
611 Err(arc) => {
612 (
614 arc.source_id.clone(),
615 arc.event.clone(),
616 arc.timestamp,
617 arc.profiling.clone(),
618 )
619 }
620 };
621
622 let source_change = match event {
624 SourceEvent::Change(change) => Some(change),
625 _ => None,
626 };
627
628 assert_eq!(source_id, "test-source");
629 assert!(source_change.is_some());
630 }
631}