1use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22pub trait Timestamped {
24 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32 Source,
33 Query,
34 Reaction,
35 BootstrapProvider,
36 IdentityProvider,
37}
38
39#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
162pub enum ComponentStatus {
163 Starting,
164 Running,
165 Stopping,
166 Stopped,
167 Reconfiguring,
168 Error,
169}
170
171#[derive(Debug, Clone)]
173pub struct SourceChangeEvent {
174 pub source_id: String,
175 pub change: SourceChange,
176 pub timestamp: chrono::DateTime<chrono::Utc>,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
181pub enum SourceControl {
182 Subscription {
184 query_id: String,
185 query_node_id: String,
186 node_labels: Vec<String>,
187 rel_labels: Vec<String>,
188 operation: ControlOperation,
189 },
190 FuturesDue,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
196pub enum ControlOperation {
197 Insert,
198 Update,
199 Delete,
200}
201
202#[derive(Debug, Clone)]
204pub enum SourceEvent {
205 Change(SourceChange),
207 Control(SourceControl),
209}
210
211#[derive(Debug, Clone)]
213pub struct SourceEventWrapper {
214 pub source_id: String,
215 pub event: SourceEvent,
216 pub timestamp: chrono::DateTime<chrono::Utc>,
217 pub profiling: Option<ProfilingMetadata>,
219}
220
221impl SourceEventWrapper {
222 pub fn new(
224 source_id: String,
225 event: SourceEvent,
226 timestamp: chrono::DateTime<chrono::Utc>,
227 ) -> Self {
228 Self {
229 source_id,
230 event,
231 timestamp,
232 profiling: None,
233 }
234 }
235
236 pub fn with_profiling(
238 source_id: String,
239 event: SourceEvent,
240 timestamp: chrono::DateTime<chrono::Utc>,
241 profiling: ProfilingMetadata,
242 ) -> Self {
243 Self {
244 source_id,
245 event,
246 timestamp,
247 profiling: Some(profiling),
248 }
249 }
250
251 pub fn into_parts(
254 self,
255 ) -> (
256 String,
257 SourceEvent,
258 chrono::DateTime<chrono::Utc>,
259 Option<ProfilingMetadata>,
260 ) {
261 (self.source_id, self.event, self.timestamp, self.profiling)
262 }
263
264 pub fn try_unwrap_arc(
271 arc_self: Arc<Self>,
272 ) -> Result<
273 (
274 String,
275 SourceEvent,
276 chrono::DateTime<chrono::Utc>,
277 Option<ProfilingMetadata>,
278 ),
279 Arc<Self>,
280 > {
281 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
282 }
283}
284
285impl Timestamped for SourceEventWrapper {
287 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
288 self.timestamp
289 }
290}
291
292pub type ArcSourceEvent = Arc<SourceEventWrapper>;
294
295#[derive(Debug, Clone)]
297pub struct BootstrapEvent {
298 pub source_id: String,
299 pub change: SourceChange,
300 pub timestamp: chrono::DateTime<chrono::Utc>,
301 pub sequence: u64,
302}
303
304#[derive(Debug, Clone)]
306pub struct BootstrapComplete {
307 pub source_id: String,
308 pub total_events: u64,
309}
310
311#[derive(Debug, Clone)]
313pub struct SubscriptionRequest {
314 pub query_id: String,
315 pub source_id: String,
316 pub enable_bootstrap: bool,
317 pub node_labels: Vec<String>,
318 pub relation_labels: Vec<String>,
319}
320
321pub struct SubscriptionResponse {
323 pub query_id: String,
324 pub source_id: String,
325 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
326 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
327}
328
329pub struct QuerySubscriptionResponse {
331 pub query_id: String,
332 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
337#[serde(tag = "type")]
338pub enum ResultDiff {
339 #[serde(rename = "ADD")]
340 Add { data: serde_json::Value },
341 #[serde(rename = "DELETE")]
342 Delete { data: serde_json::Value },
343 #[serde(rename = "UPDATE")]
344 Update {
345 data: serde_json::Value,
346 before: serde_json::Value,
347 after: serde_json::Value,
348 #[serde(skip_serializing_if = "Option::is_none")]
349 grouping_keys: Option<Vec<String>>,
350 },
351 #[serde(rename = "aggregation")]
352 Aggregation {
353 before: Option<serde_json::Value>,
354 after: serde_json::Value,
355 },
356 #[serde(rename = "noop")]
357 Noop,
358}
359
360#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct QueryResult {
366 pub query_id: String,
367 pub timestamp: chrono::DateTime<chrono::Utc>,
368 pub results: Vec<ResultDiff>,
369 pub metadata: HashMap<String, serde_json::Value>,
370 #[serde(skip_serializing_if = "Option::is_none")]
372 pub profiling: Option<ProfilingMetadata>,
373}
374
375impl QueryResult {
376 pub fn new(
378 query_id: String,
379 timestamp: chrono::DateTime<chrono::Utc>,
380 results: Vec<ResultDiff>,
381 metadata: HashMap<String, serde_json::Value>,
382 ) -> Self {
383 Self {
384 query_id,
385 timestamp,
386 results,
387 metadata,
388 profiling: None,
389 }
390 }
391
392 pub fn with_profiling(
394 query_id: String,
395 timestamp: chrono::DateTime<chrono::Utc>,
396 results: Vec<ResultDiff>,
397 metadata: HashMap<String, serde_json::Value>,
398 profiling: ProfilingMetadata,
399 ) -> Self {
400 Self {
401 query_id,
402 timestamp,
403 results,
404 metadata,
405 profiling: Some(profiling),
406 }
407 }
408}
409
410impl Timestamped for QueryResult {
412 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
413 self.timestamp
414 }
415}
416
417pub type ArcQueryResult = Arc<QueryResult>;
419
420#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct ComponentEvent {
426 pub component_id: String,
427 pub component_type: ComponentType,
428 pub status: ComponentStatus,
429 pub timestamp: chrono::DateTime<chrono::Utc>,
430 pub message: Option<String>,
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize)]
435pub enum ControlMessage {
436 Start(String),
437 Stop(String),
438 Status(String),
439 Shutdown,
440}
441
442pub type ComponentEventBroadcastSender = broadcast::Sender<ComponentEvent>;
443pub type ComponentEventBroadcastReceiver = broadcast::Receiver<ComponentEvent>;
444pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
447pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
448pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
449pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
450
451pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
453pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
454
455pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
457pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
458
459pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
461pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
462pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
463pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
464
465#[derive(Debug, Clone, Serialize, Deserialize)]
467pub enum ControlSignal {
468 Running { query_id: String },
470 Stopped { query_id: String },
472 Deleted { query_id: String },
474}
475
476#[derive(Debug, Clone)]
478pub struct ControlSignalWrapper {
479 pub signal: ControlSignal,
480 pub timestamp: chrono::DateTime<chrono::Utc>,
481 pub sequence_number: Option<u64>,
482}
483
484impl ControlSignalWrapper {
485 pub fn new(signal: ControlSignal) -> Self {
486 Self {
487 signal,
488 timestamp: chrono::Utc::now(),
489 sequence_number: None,
490 }
491 }
492
493 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
494 Self {
495 signal,
496 timestamp: chrono::Utc::now(),
497 sequence_number: Some(sequence_number),
498 }
499 }
500}
501
502pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
503pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
504
505pub struct EventChannels {
506 pub _control_tx: ControlMessageSender,
507 pub control_signal_tx: ControlSignalSender,
508}
509
510pub struct EventReceivers {
511 pub _control_rx: ControlMessageReceiver,
512 pub control_signal_rx: ControlSignalReceiver,
513}
514
515impl EventChannels {
516 pub fn new() -> (Self, EventReceivers) {
517 let (control_tx, control_rx) = mpsc::channel(100);
518 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
519
520 let channels = Self {
521 _control_tx: control_tx,
522 control_signal_tx,
523 };
524
525 let receivers = EventReceivers {
526 _control_rx: control_rx,
527 control_signal_rx,
528 };
529
530 (channels, receivers)
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
538
539 fn create_test_source_change() -> SourceChange {
540 let element = Element::Node {
541 metadata: ElementMetadata {
542 reference: ElementReference::new("TestSource", "test-node-1"),
543 labels: vec!["TestLabel".into()].into(),
544 effective_from: 1000,
545 },
546 properties: Default::default(),
547 };
548 SourceChange::Insert { element }
549 }
550
551 #[test]
552 fn test_source_event_wrapper_into_parts() {
553 let change = create_test_source_change();
554 let wrapper = SourceEventWrapper::new(
555 "test-source".to_string(),
556 SourceEvent::Change(change),
557 chrono::Utc::now(),
558 );
559
560 let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
561
562 assert_eq!(source_id, "test-source");
563 assert!(matches!(event, SourceEvent::Change(_)));
564 assert!(profiling.is_none());
565 }
566
567 #[test]
568 fn test_try_unwrap_arc_sole_owner() {
569 let change = create_test_source_change();
570 let wrapper = SourceEventWrapper::new(
571 "test-source".to_string(),
572 SourceEvent::Change(change),
573 chrono::Utc::now(),
574 );
575 let arc = Arc::new(wrapper);
576
577 let result = SourceEventWrapper::try_unwrap_arc(arc);
579 assert!(result.is_ok());
580
581 let (source_id, event, _timestamp, _profiling) = result.unwrap();
582 assert_eq!(source_id, "test-source");
583 assert!(matches!(event, SourceEvent::Change(_)));
584 }
585
586 #[test]
587 fn test_try_unwrap_arc_shared() {
588 let change = create_test_source_change();
589 let wrapper = SourceEventWrapper::new(
590 "test-source".to_string(),
591 SourceEvent::Change(change),
592 chrono::Utc::now(),
593 );
594 let arc = Arc::new(wrapper);
595 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
599 assert!(result.is_err());
600
601 let returned_arc = result.unwrap_err();
603 assert_eq!(returned_arc.source_id, "test-source");
604 }
605
606 #[test]
607 fn test_zero_copy_extraction_path() {
608 let change = create_test_source_change();
610 let wrapper = SourceEventWrapper::new(
611 "test-source".to_string(),
612 SourceEvent::Change(change),
613 chrono::Utc::now(),
614 );
615 let arc = Arc::new(wrapper);
616
617 let (source_id, event, _timestamp, _profiling) =
619 match SourceEventWrapper::try_unwrap_arc(arc) {
620 Ok(parts) => parts,
621 Err(arc) => {
622 (
624 arc.source_id.clone(),
625 arc.event.clone(),
626 arc.timestamp,
627 arc.profiling.clone(),
628 )
629 }
630 };
631
632 let source_change = match event {
634 SourceEvent::Change(change) => Some(change),
635 _ => None,
636 };
637
638 assert_eq!(source_id, "test-source");
639 assert!(source_change.is_some());
640 }
641}