1use crate::profiling::ProfilingMetadata;
16use drasi_core::models::SourceChange;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{broadcast, mpsc};
21
22pub trait Timestamped {
24 fn timestamp(&self) -> chrono::DateTime<chrono::Utc>;
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
31pub enum ComponentType {
32 Source,
33 Query,
34 Reaction,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum ComponentStatus {
161 Starting,
162 Running,
163 Stopping,
164 Stopped,
165 Reconfiguring,
166 Error,
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceChangeEvent {
171 pub source_id: String,
172 pub change: SourceChange,
173 pub timestamp: chrono::DateTime<chrono::Utc>,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub enum SourceControl {
179 Subscription {
181 query_id: String,
182 query_node_id: String,
183 node_labels: Vec<String>,
184 rel_labels: Vec<String>,
185 operation: ControlOperation,
186 },
187 FuturesDue,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
193pub enum ControlOperation {
194 Insert,
195 Update,
196 Delete,
197}
198
199#[derive(Debug, Clone)]
201pub enum SourceEvent {
202 Change(SourceChange),
204 Control(SourceControl),
206}
207
208#[derive(Debug, Clone)]
210pub struct SourceEventWrapper {
211 pub source_id: String,
212 pub event: SourceEvent,
213 pub timestamp: chrono::DateTime<chrono::Utc>,
214 pub profiling: Option<ProfilingMetadata>,
216}
217
218impl SourceEventWrapper {
219 pub fn new(
221 source_id: String,
222 event: SourceEvent,
223 timestamp: chrono::DateTime<chrono::Utc>,
224 ) -> Self {
225 Self {
226 source_id,
227 event,
228 timestamp,
229 profiling: None,
230 }
231 }
232
233 pub fn with_profiling(
235 source_id: String,
236 event: SourceEvent,
237 timestamp: chrono::DateTime<chrono::Utc>,
238 profiling: ProfilingMetadata,
239 ) -> Self {
240 Self {
241 source_id,
242 event,
243 timestamp,
244 profiling: Some(profiling),
245 }
246 }
247
248 pub fn into_parts(
251 self,
252 ) -> (
253 String,
254 SourceEvent,
255 chrono::DateTime<chrono::Utc>,
256 Option<ProfilingMetadata>,
257 ) {
258 (self.source_id, self.event, self.timestamp, self.profiling)
259 }
260
261 pub fn try_unwrap_arc(
268 arc_self: Arc<Self>,
269 ) -> Result<
270 (
271 String,
272 SourceEvent,
273 chrono::DateTime<chrono::Utc>,
274 Option<ProfilingMetadata>,
275 ),
276 Arc<Self>,
277 > {
278 Arc::try_unwrap(arc_self).map(|wrapper| wrapper.into_parts())
279 }
280}
281
282impl Timestamped for SourceEventWrapper {
284 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
285 self.timestamp
286 }
287}
288
289pub type ArcSourceEvent = Arc<SourceEventWrapper>;
291
292#[derive(Debug, Clone)]
294pub struct BootstrapEvent {
295 pub source_id: String,
296 pub change: SourceChange,
297 pub timestamp: chrono::DateTime<chrono::Utc>,
298 pub sequence: u64,
299}
300
301#[derive(Debug, Clone)]
303pub struct BootstrapComplete {
304 pub source_id: String,
305 pub total_events: u64,
306}
307
308#[derive(Debug, Clone)]
310pub struct SubscriptionRequest {
311 pub query_id: String,
312 pub source_id: String,
313 pub enable_bootstrap: bool,
314 pub node_labels: Vec<String>,
315 pub relation_labels: Vec<String>,
316}
317
318pub struct SubscriptionResponse {
320 pub query_id: String,
321 pub source_id: String,
322 pub receiver: Box<dyn super::ChangeReceiver<SourceEventWrapper>>,
323 pub bootstrap_receiver: Option<BootstrapEventReceiver>,
324}
325
326pub struct QuerySubscriptionResponse {
328 pub query_id: String,
329 pub receiver: Box<dyn super::ChangeReceiver<QueryResult>>,
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
334#[serde(tag = "type")]
335pub enum ResultDiff {
336 #[serde(rename = "ADD")]
337 Add { data: serde_json::Value },
338 #[serde(rename = "DELETE")]
339 Delete { data: serde_json::Value },
340 #[serde(rename = "UPDATE")]
341 Update {
342 data: serde_json::Value,
343 before: serde_json::Value,
344 after: serde_json::Value,
345 #[serde(skip_serializing_if = "Option::is_none")]
346 grouping_keys: Option<Vec<String>>,
347 },
348 #[serde(rename = "aggregation")]
349 Aggregation {
350 before: Option<serde_json::Value>,
351 after: serde_json::Value,
352 },
353 #[serde(rename = "noop")]
354 Noop,
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
358pub struct QueryResult {
359 pub query_id: String,
360 pub timestamp: chrono::DateTime<chrono::Utc>,
361 pub results: Vec<ResultDiff>,
362 pub metadata: HashMap<String, serde_json::Value>,
363 #[serde(skip_serializing_if = "Option::is_none")]
365 pub profiling: Option<ProfilingMetadata>,
366}
367
368impl QueryResult {
369 pub fn new(
371 query_id: String,
372 timestamp: chrono::DateTime<chrono::Utc>,
373 results: Vec<ResultDiff>,
374 metadata: HashMap<String, serde_json::Value>,
375 ) -> Self {
376 Self {
377 query_id,
378 timestamp,
379 results,
380 metadata,
381 profiling: None,
382 }
383 }
384
385 pub fn with_profiling(
387 query_id: String,
388 timestamp: chrono::DateTime<chrono::Utc>,
389 results: Vec<ResultDiff>,
390 metadata: HashMap<String, serde_json::Value>,
391 profiling: ProfilingMetadata,
392 ) -> Self {
393 Self {
394 query_id,
395 timestamp,
396 results,
397 metadata,
398 profiling: Some(profiling),
399 }
400 }
401}
402
403impl Timestamped for QueryResult {
405 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
406 self.timestamp
407 }
408}
409
410pub type ArcQueryResult = Arc<QueryResult>;
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct ComponentEvent {
415 pub component_id: String,
416 pub component_type: ComponentType,
417 pub status: ComponentStatus,
418 pub timestamp: chrono::DateTime<chrono::Utc>,
419 pub message: Option<String>,
420}
421
422#[derive(Debug, Clone, Serialize, Deserialize)]
423pub enum ControlMessage {
424 Start(String),
425 Stop(String),
426 Status(String),
427 Shutdown,
428}
429
430pub type ComponentEventReceiver = mpsc::Receiver<ComponentEvent>;
431pub type ComponentEventSender = mpsc::Sender<ComponentEvent>;
432pub type ControlMessageReceiver = mpsc::Receiver<ControlMessage>;
433pub type ControlMessageSender = mpsc::Sender<ControlMessage>;
434
435pub type SourceBroadcastSender = broadcast::Sender<ArcSourceEvent>;
437pub type SourceBroadcastReceiver = broadcast::Receiver<ArcSourceEvent>;
438
439pub type QueryResultBroadcastSender = broadcast::Sender<ArcQueryResult>;
441pub type QueryResultBroadcastReceiver = broadcast::Receiver<ArcQueryResult>;
442
443pub type BootstrapEventSender = mpsc::Sender<BootstrapEvent>;
445pub type BootstrapEventReceiver = mpsc::Receiver<BootstrapEvent>;
446pub type BootstrapCompleteSender = mpsc::Sender<BootstrapComplete>;
447pub type BootstrapCompleteReceiver = mpsc::Receiver<BootstrapComplete>;
448
449#[derive(Debug, Clone, Serialize, Deserialize)]
451pub enum ControlSignal {
452 Running { query_id: String },
454 Stopped { query_id: String },
456 Deleted { query_id: String },
458}
459
460#[derive(Debug, Clone)]
462pub struct ControlSignalWrapper {
463 pub signal: ControlSignal,
464 pub timestamp: chrono::DateTime<chrono::Utc>,
465 pub sequence_number: Option<u64>,
466}
467
468impl ControlSignalWrapper {
469 pub fn new(signal: ControlSignal) -> Self {
470 Self {
471 signal,
472 timestamp: chrono::Utc::now(),
473 sequence_number: None,
474 }
475 }
476
477 pub fn with_sequence(signal: ControlSignal, sequence_number: u64) -> Self {
478 Self {
479 signal,
480 timestamp: chrono::Utc::now(),
481 sequence_number: Some(sequence_number),
482 }
483 }
484}
485
486pub type ControlSignalReceiver = mpsc::Receiver<ControlSignalWrapper>;
487pub type ControlSignalSender = mpsc::Sender<ControlSignalWrapper>;
488
489pub struct EventChannels {
490 pub component_event_tx: ComponentEventSender,
491 pub _control_tx: ControlMessageSender,
492 pub control_signal_tx: ControlSignalSender,
493}
494
495pub struct EventReceivers {
496 pub component_event_rx: ComponentEventReceiver,
497 pub _control_rx: ControlMessageReceiver,
498 pub control_signal_rx: ControlSignalReceiver,
499}
500
501impl EventChannels {
502 pub fn new() -> (Self, EventReceivers) {
503 let (component_event_tx, component_event_rx) = mpsc::channel(1000);
504 let (control_tx, control_rx) = mpsc::channel(100);
505 let (control_signal_tx, control_signal_rx) = mpsc::channel(100);
506
507 let channels = Self {
508 component_event_tx,
509 _control_tx: control_tx,
510 control_signal_tx,
511 };
512
513 let receivers = EventReceivers {
514 component_event_rx,
515 _control_rx: control_rx,
516 control_signal_rx,
517 };
518
519 (channels, receivers)
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
527
528 fn create_test_source_change() -> SourceChange {
529 let element = Element::Node {
530 metadata: ElementMetadata {
531 reference: ElementReference::new("TestSource", "test-node-1"),
532 labels: vec!["TestLabel".into()].into(),
533 effective_from: 1000,
534 },
535 properties: Default::default(),
536 };
537 SourceChange::Insert { element }
538 }
539
540 #[test]
541 fn test_source_event_wrapper_into_parts() {
542 let change = create_test_source_change();
543 let wrapper = SourceEventWrapper::new(
544 "test-source".to_string(),
545 SourceEvent::Change(change),
546 chrono::Utc::now(),
547 );
548
549 let (source_id, event, _timestamp, profiling) = wrapper.into_parts();
550
551 assert_eq!(source_id, "test-source");
552 assert!(matches!(event, SourceEvent::Change(_)));
553 assert!(profiling.is_none());
554 }
555
556 #[test]
557 fn test_try_unwrap_arc_sole_owner() {
558 let change = create_test_source_change();
559 let wrapper = SourceEventWrapper::new(
560 "test-source".to_string(),
561 SourceEvent::Change(change),
562 chrono::Utc::now(),
563 );
564 let arc = Arc::new(wrapper);
565
566 let result = SourceEventWrapper::try_unwrap_arc(arc);
568 assert!(result.is_ok());
569
570 let (source_id, event, _timestamp, _profiling) = result.unwrap();
571 assert_eq!(source_id, "test-source");
572 assert!(matches!(event, SourceEvent::Change(_)));
573 }
574
575 #[test]
576 fn test_try_unwrap_arc_shared() {
577 let change = create_test_source_change();
578 let wrapper = SourceEventWrapper::new(
579 "test-source".to_string(),
580 SourceEvent::Change(change),
581 chrono::Utc::now(),
582 );
583 let arc = Arc::new(wrapper);
584 let _arc2 = arc.clone(); let result = SourceEventWrapper::try_unwrap_arc(arc);
588 assert!(result.is_err());
589
590 let returned_arc = result.unwrap_err();
592 assert_eq!(returned_arc.source_id, "test-source");
593 }
594
595 #[test]
596 fn test_zero_copy_extraction_path() {
597 let change = create_test_source_change();
599 let wrapper = SourceEventWrapper::new(
600 "test-source".to_string(),
601 SourceEvent::Change(change),
602 chrono::Utc::now(),
603 );
604 let arc = Arc::new(wrapper);
605
606 let (source_id, event, _timestamp, _profiling) =
608 match SourceEventWrapper::try_unwrap_arc(arc) {
609 Ok(parts) => parts,
610 Err(arc) => {
611 (
613 arc.source_id.clone(),
614 arc.event.clone(),
615 arc.timestamp,
616 arc.profiling.clone(),
617 )
618 }
619 };
620
621 let source_change = match event {
623 SourceEvent::Change(change) => Some(change),
624 _ => None,
625 };
626
627 assert_eq!(source_id, "test-source");
628 assert!(source_change.is_some());
629 }
630}