1use crate::engine::Engine;
11use crate::event::{Event, SharedEvent};
12use crate::persistence::{CheckpointConfig, CheckpointManager, EngineCheckpoint, StoreError};
13use rustc_hash::FxHashMap;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Instant;
17use tokio::sync::{mpsc, watch};
18use tracing::{error, info, warn};
19use varpulis_core::ast::{Program, Stmt, StreamSource};
20
21#[derive(Debug, Clone)]
25pub enum ContextMessage {
26 Event(SharedEvent),
28 CheckpointBarrier(CheckpointBarrier),
30 WatermarkUpdate {
32 source_context: String,
33 watermark_ms: i64,
34 },
35}
36
37#[derive(Debug, Clone)]
39pub struct CheckpointBarrier {
40 pub checkpoint_id: u64,
41 pub timestamp_ms: i64,
42}
43
44#[derive(Debug)]
46pub struct CheckpointAck {
47 pub context_name: String,
48 pub checkpoint_id: u64,
49 pub engine_checkpoint: EngineCheckpoint,
50}
51
52struct PendingCheckpoint {
54 checkpoint_id: u64,
55 timestamp_ms: i64,
56 acks: HashMap<String, EngineCheckpoint>,
57 started_at: Instant,
58}
59
60pub struct CheckpointCoordinator {
65 manager: CheckpointManager,
66 ack_tx: mpsc::Sender<CheckpointAck>,
67 ack_rx: mpsc::Receiver<CheckpointAck>,
68 context_names: Vec<String>,
69 pending: Option<PendingCheckpoint>,
70 next_checkpoint_id: u64,
71}
72
73impl std::fmt::Debug for CheckpointCoordinator {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("CheckpointCoordinator")
76 .field("context_names", &self.context_names)
77 .field("next_checkpoint_id", &self.next_checkpoint_id)
78 .field("has_pending", &self.pending.is_some())
79 .finish_non_exhaustive()
80 }
81}
82
83impl CheckpointCoordinator {
84 pub fn new(manager: CheckpointManager, context_names: Vec<String>) -> Self {
86 let (ack_tx, ack_rx) = mpsc::channel(context_names.len() * 2);
87 Self {
88 manager,
89 ack_tx,
90 ack_rx,
91 context_names,
92 pending: None,
93 next_checkpoint_id: 1,
94 }
95 }
96
97 pub fn ack_sender(&self) -> mpsc::Sender<CheckpointAck> {
99 self.ack_tx.clone()
100 }
101
102 pub fn initiate(&mut self, context_txs: &FxHashMap<String, mpsc::Sender<ContextMessage>>) {
104 if self.pending.is_some() {
105 warn!("Checkpoint already in progress, skipping initiation");
106 return;
107 }
108
109 let checkpoint_id = self.next_checkpoint_id;
110 self.next_checkpoint_id += 1;
111 let timestamp_ms = chrono::Utc::now().timestamp_millis();
112
113 let barrier = CheckpointBarrier {
114 checkpoint_id,
115 timestamp_ms,
116 };
117
118 for (ctx_name, tx) in context_txs {
119 if let Err(e) = tx.try_send(ContextMessage::CheckpointBarrier(barrier.clone())) {
120 error!(
121 "Failed to send checkpoint barrier to context '{}': {}",
122 ctx_name, e
123 );
124 }
125 }
126
127 self.pending = Some(PendingCheckpoint {
128 checkpoint_id,
129 timestamp_ms,
130 acks: HashMap::new(),
131 started_at: Instant::now(),
132 });
133
134 info!("Initiated checkpoint {}", checkpoint_id);
135 }
136
137 pub fn receive_ack(&mut self, ack: CheckpointAck) -> Option<crate::persistence::Checkpoint> {
139 let pending = self.pending.as_mut()?;
140
141 if ack.checkpoint_id != pending.checkpoint_id {
142 warn!(
143 "Received ack for checkpoint {} but expecting {}",
144 ack.checkpoint_id, pending.checkpoint_id
145 );
146 return None;
147 }
148
149 pending.acks.insert(ack.context_name, ack.engine_checkpoint);
150
151 if pending.acks.len() == self.context_names.len() {
152 let pending = self.pending.take().unwrap();
153 let mut context_states = HashMap::new();
154 for (name, cp) in pending.acks {
155 context_states.insert(name, cp);
156 }
157
158 Some(crate::persistence::Checkpoint {
159 id: pending.checkpoint_id,
160 timestamp_ms: pending.timestamp_ms,
161 events_processed: 0, window_states: HashMap::new(),
163 pattern_states: HashMap::new(),
164 metadata: HashMap::new(),
165 context_states,
166 })
167 } else {
168 None
169 }
170 }
171
172 pub fn try_complete(&mut self) -> Result<(), StoreError> {
174 while let Ok(ack) = self.ack_rx.try_recv() {
175 if let Some(checkpoint) = self.receive_ack(ack) {
176 self.manager.checkpoint(checkpoint)?;
177 return Ok(());
178 }
179 }
180
181 if let Some(ref pending) = self.pending {
183 if pending.started_at.elapsed() > std::time::Duration::from_secs(30) {
184 warn!(
185 "Checkpoint {} has been pending for {:.1}s — contexts may be blocked",
186 pending.checkpoint_id,
187 pending.started_at.elapsed().as_secs_f64()
188 );
189 }
190 }
191
192 Ok(())
193 }
194
195 pub fn should_checkpoint(&self) -> bool {
197 self.pending.is_none() && self.manager.should_checkpoint()
198 }
199
200 pub const fn has_pending(&self) -> bool {
202 self.pending.is_some()
203 }
204}
205
206#[derive(Debug, Clone)]
208pub struct ContextConfig {
209 pub name: String,
210 pub cores: Option<Vec<usize>>,
211}
212
213#[derive(Debug, Clone, Default)]
218pub struct ContextMap {
219 contexts: HashMap<String, ContextConfig>,
221 stream_assignments: HashMap<String, String>,
223 cross_context_emits: HashMap<(String, usize), String>,
225}
226
227impl ContextMap {
228 pub fn new() -> Self {
229 Self::default()
230 }
231
232 pub fn register_context(&mut self, config: ContextConfig) {
234 self.contexts.insert(config.name.clone(), config);
235 }
236
237 pub fn assign_stream(&mut self, stream_name: String, context_name: String) {
239 self.stream_assignments.insert(stream_name, context_name);
240 }
241
242 pub fn add_cross_context_emit(
244 &mut self,
245 stream_name: String,
246 emit_index: usize,
247 target_context: String,
248 ) {
249 self.cross_context_emits
250 .insert((stream_name, emit_index), target_context);
251 }
252
253 pub fn has_contexts(&self) -> bool {
255 !self.contexts.is_empty()
256 }
257
258 pub const fn contexts(&self) -> &HashMap<String, ContextConfig> {
260 &self.contexts
261 }
262
263 pub fn stream_context(&self, stream_name: &str) -> Option<&str> {
265 self.stream_assignments.get(stream_name).map(|s| s.as_str())
266 }
267
268 pub const fn stream_assignments(&self) -> &HashMap<String, String> {
270 &self.stream_assignments
271 }
272
273 pub const fn cross_context_emits(&self) -> &HashMap<(String, usize), String> {
275 &self.cross_context_emits
276 }
277}
278
279pub fn filter_program_for_context(
286 program: &Program,
287 context_name: &str,
288 context_map: &ContextMap,
289) -> Program {
290 let filtered_statements = program
291 .statements
292 .iter()
293 .filter(|stmt| {
294 if let Stmt::StreamDecl { name, .. } = &stmt.node {
295 match context_map.stream_context(name) {
297 Some(ctx) => ctx == context_name,
298 None => true,
300 }
301 } else {
302 true
304 }
305 })
306 .cloned()
307 .collect();
308
309 Program {
310 statements: filtered_statements,
311 }
312}
313
314#[cfg(target_os = "linux")]
319pub fn verify_cpu_affinity() -> Option<Vec<usize>> {
320 use std::fs;
321
322 let status = fs::read_to_string("/proc/self/status").ok()?;
323 for line in status.lines() {
324 if line.starts_with("Cpus_allowed_list:") {
325 let list_str = line.split(':').nth(1)?.trim();
326 let mut cores = Vec::new();
327 for part in list_str.split(',') {
328 let part = part.trim();
329 if let Some((start, end)) = part.split_once('-') {
330 if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
331 cores.extend(s..=e);
332 }
333 } else if let Ok(core) = part.parse::<usize>() {
334 cores.push(core);
335 }
336 }
337 return Some(cores);
338 }
339 }
340 None
341}
342
343pub struct ContextRuntime {
348 name: String,
349 engine: Engine,
350 output_tx: mpsc::Sender<Event>,
352 event_rx: mpsc::Receiver<ContextMessage>,
354 engine_output_rx: mpsc::Receiver<Event>,
356 all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
358 ingress_routing: FxHashMap<String, String>,
360 shutdown_rx: watch::Receiver<bool>,
362 ack_tx: Option<mpsc::Sender<CheckpointAck>>,
364 events_processed: u64,
365 output_events_emitted: u64,
366}
367
368impl std::fmt::Debug for ContextRuntime {
369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370 f.debug_struct("ContextRuntime")
371 .field("name", &self.name)
372 .field("ingress_routing", &self.ingress_routing)
373 .field("events_processed", &self.events_processed)
374 .field("output_events_emitted", &self.output_events_emitted)
375 .finish_non_exhaustive()
376 }
377}
378
379impl ContextRuntime {
380 #[allow(clippy::too_many_arguments)]
382 pub const fn new(
383 name: String,
384 engine: Engine,
385 output_tx: mpsc::Sender<Event>,
386 event_rx: mpsc::Receiver<ContextMessage>,
387 engine_output_rx: mpsc::Receiver<Event>,
388 all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
389 ingress_routing: FxHashMap<String, String>,
390 shutdown_rx: watch::Receiver<bool>,
391 ) -> Self {
392 Self {
393 name,
394 engine,
395 output_tx,
396 event_rx,
397 engine_output_rx,
398 all_context_txs,
399 ingress_routing,
400 shutdown_rx,
401 ack_tx: None,
402 events_processed: 0,
403 output_events_emitted: 0,
404 }
405 }
406
407 pub fn with_ack_sender(mut self, ack_tx: mpsc::Sender<CheckpointAck>) -> Self {
409 self.ack_tx = Some(ack_tx);
410 self
411 }
412
413 fn drain_and_route_output(&mut self) {
416 while let Ok(output_event) = self.engine_output_rx.try_recv() {
417 self.output_events_emitted += 1;
418
419 if let Some(target_ctx) = self.ingress_routing.get(&*output_event.event_type) {
421 if let Some(tx) = self.all_context_txs.get(target_ctx) {
422 let shared = Arc::new(output_event);
423 let _ = tx.try_send(ContextMessage::Event(Arc::clone(&shared)));
424 let owned = Arc::try_unwrap(shared).unwrap_or_else(|arc| (*arc).clone());
426 let _ = self.output_tx.try_send(owned);
427 continue;
428 }
429 }
430
431 let _ = self.output_tx.try_send(output_event);
433 }
434 }
435
436 async fn handle_checkpoint_barrier(&self, barrier: CheckpointBarrier) {
438 if let Some(ref ack_tx) = self.ack_tx {
439 let checkpoint = self.engine.create_checkpoint();
440 let _ = ack_tx
441 .send(CheckpointAck {
442 context_name: self.name.clone(),
443 checkpoint_id: barrier.checkpoint_id,
444 engine_checkpoint: checkpoint,
445 })
446 .await;
447 }
448 }
449
450 pub async fn run(&mut self) {
459 info!("Context '{}' runtime started", self.name);
460
461 #[cfg(target_os = "linux")]
462 if let Some(cores) = verify_cpu_affinity() {
463 info!("Context '{}' running on cores {:?}", self.name, cores);
464 }
465
466 let has_sessions = self.engine.has_session_windows();
468 let sweep_interval = self
469 .engine
470 .min_session_gap()
471 .and_then(|d| d.to_std().ok())
472 .unwrap_or(std::time::Duration::from_secs(60));
473
474 let mut sweep_timer = tokio::time::interval(sweep_interval);
475 sweep_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
476 sweep_timer.tick().await;
478
479 loop {
480 tokio::select! {
481 biased;
482
483 _ = self.shutdown_rx.changed() => {
484 if *self.shutdown_rx.borrow() {
485 info!("Context '{}' received shutdown signal", self.name);
486 if has_sessions {
488 if let Err(e) = self.engine.flush_expired_sessions().await {
489 error!("Context '{}' shutdown session flush error: {}", self.name, e);
490 }
491 self.drain_and_route_output();
492 }
493 break;
494 }
495 }
496
497 _ = sweep_timer.tick(), if has_sessions => {
498 match self.engine.flush_expired_sessions().await {
499 Ok(()) => {}
500 Err(e) => {
501 error!("Context '{}' session sweep error: {}", self.name, e);
502 }
503 }
504 self.drain_and_route_output();
505 }
506
507 msg = self.event_rx.recv() => {
508 match msg {
509 Some(ContextMessage::Event(event)) => {
510 self.events_processed += 1;
511
512 match self.engine.process_shared(Arc::clone(&event)).await {
514 Ok(()) => {}
515 Err(e) => {
516 error!("Context '{}' processing error: {}", self.name, e);
517 }
518 }
519
520 self.drain_and_route_output();
521 }
522 Some(ContextMessage::CheckpointBarrier(barrier)) => {
523 self.handle_checkpoint_barrier(barrier).await;
524 }
525 Some(ContextMessage::WatermarkUpdate { source_context, watermark_ms }) => {
526 let _ = self.engine.advance_external_watermark(&source_context, watermark_ms).await;
528 }
529 None => {
530 break;
532 }
533 }
534 }
535 }
536 }
537
538 self.all_context_txs.clear();
540
541 info!(
542 "Context '{}' runtime stopped (processed {} events, emitted {} output events)",
543 self.name, self.events_processed, self.output_events_emitted
544 );
545 }
546}
547
548#[derive(Clone)]
554pub struct EventTypeRouter {
555 routes: Arc<FxHashMap<String, mpsc::Sender<ContextMessage>>>,
556 default_tx: mpsc::Sender<ContextMessage>,
557}
558
559impl std::fmt::Debug for EventTypeRouter {
560 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561 f.debug_struct("EventTypeRouter")
562 .field("route_count", &self.routes.len())
563 .field("route_keys", &self.routes.keys().collect::<Vec<_>>())
564 .finish_non_exhaustive()
565 }
566}
567
568#[derive(Debug)]
570pub enum DispatchError {
571 ChannelFull(ContextMessage),
573 ChannelClosed(ContextMessage),
575}
576
577impl EventTypeRouter {
578 pub fn dispatch(&self, event: SharedEvent) -> Result<(), DispatchError> {
583 let tx = self
584 .routes
585 .get(&*event.event_type)
586 .unwrap_or(&self.default_tx);
587 let msg = ContextMessage::Event(event);
588 match tx.try_send(msg) {
589 Ok(()) => Ok(()),
590 Err(mpsc::error::TrySendError::Full(msg)) => Err(DispatchError::ChannelFull(msg)),
591 Err(mpsc::error::TrySendError::Closed(msg)) => Err(DispatchError::ChannelClosed(msg)),
592 }
593 }
594
595 pub async fn dispatch_await(&self, event: SharedEvent) -> Result<(), String> {
599 let event_type = event.event_type.clone();
600 let tx = self.routes.get(&*event_type).unwrap_or(&self.default_tx);
601 tx.send(ContextMessage::Event(event))
602 .await
603 .map_err(|e| format!("Failed to send event type '{event_type}': {e}"))
604 }
605
606 pub fn dispatch_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
608 let mut errors = Vec::new();
609 for event in events {
610 if let Err(e) = self.dispatch(event) {
611 errors.push(e);
612 }
613 }
614 errors
615 }
616}
617
618pub struct ContextOrchestrator {
623 context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
625 handles: Vec<std::thread::JoinHandle<()>>,
627 ingress_routing: FxHashMap<String, String>,
629 shutdown_tx: watch::Sender<bool>,
631 router: EventTypeRouter,
633 checkpoint_coordinator: Option<CheckpointCoordinator>,
635}
636
637impl std::fmt::Debug for ContextOrchestrator {
638 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639 f.debug_struct("ContextOrchestrator")
640 .field("context_count", &self.context_txs.len())
641 .field("ingress_routing", &self.ingress_routing)
642 .field("handle_count", &self.handles.len())
643 .finish_non_exhaustive()
644 }
645}
646
647impl ContextOrchestrator {
648 pub fn build(
657 context_map: &ContextMap,
658 program: &Program,
659 output_tx: mpsc::Sender<Event>,
660 channel_capacity: usize,
661 ) -> Result<Self, String> {
662 Self::build_with_checkpoint(
663 context_map,
664 program,
665 output_tx,
666 channel_capacity,
667 None,
668 None,
669 )
670 }
671
672 pub fn build_with_checkpoint(
674 context_map: &ContextMap,
675 program: &Program,
676 output_tx: mpsc::Sender<Event>,
677 channel_capacity: usize,
678 checkpoint_config: Option<(CheckpointConfig, Arc<dyn crate::persistence::StateStore>)>,
679 recovery_checkpoint: Option<&crate::persistence::Checkpoint>,
680 ) -> Result<Self, String> {
681 let mut context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = FxHashMap::default();
682 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
683
684 let (shutdown_tx, _shutdown_rx) = watch::channel(false);
686
687 let default_context = context_map
689 .contexts()
690 .keys()
691 .next()
692 .cloned()
693 .unwrap_or_else(|| "default".to_string());
694
695 let mut context_rxs: FxHashMap<String, mpsc::Receiver<ContextMessage>> =
697 FxHashMap::default();
698 for ctx_name in context_map.contexts().keys() {
699 let (tx, rx) = mpsc::channel(channel_capacity);
700 context_txs.insert(ctx_name.clone(), tx);
701 context_rxs.insert(ctx_name.clone(), rx);
702 }
703
704 let context_names: Vec<String> = context_map.contexts().keys().cloned().collect();
706 let checkpoint_coordinator = checkpoint_config.map(|(config, store)| {
707 let manager = CheckpointManager::new(store, config)
708 .map_err(|e| format!("Failed to create checkpoint manager: {e}"))
709 .unwrap();
710 CheckpointCoordinator::new(manager, context_names.clone())
711 });
712 let ack_tx = checkpoint_coordinator.as_ref().map(|c| c.ack_sender());
713
714 let mut ingress_routing: FxHashMap<String, String> = FxHashMap::default();
716
717 for stmt in &program.statements {
719 if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
720 if let Some(ctx_name) = context_map.stream_context(name) {
721 let event_types = Self::event_types_from_source(source);
722 for et in event_types {
723 ingress_routing.insert(et, ctx_name.to_string());
724 }
725 }
726 }
727 }
728
729 for stmt in &program.statements {
731 if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
732 if let Some(ctx_name) = context_map.stream_context(name) {
733 if let StreamSource::Ident(source_stream) = source {
734 if context_map.stream_context(source_stream).is_some() {
735 ingress_routing.insert(source_stream.clone(), ctx_name.to_string());
736 }
737 }
738 }
739 }
740 }
741
742 for ((_stream_name, _emit_idx), target_ctx) in context_map.cross_context_emits() {
744 if !context_txs.contains_key(target_ctx) {
745 warn!(
746 "Cross-context emit targets unknown context '{}'",
747 target_ctx
748 );
749 }
750 }
751
752 let mut event_type_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> =
754 FxHashMap::default();
755 for (event_type, ctx_name) in &ingress_routing {
756 if let Some(tx) = context_txs.get(ctx_name) {
757 event_type_txs.insert(event_type.clone(), tx.clone());
758 }
759 }
760 let default_tx = context_txs
761 .get(&default_context)
762 .cloned()
763 .ok_or_else(|| format!("No channel for default context '{default_context}'"))?;
764 let router = EventTypeRouter {
765 routes: Arc::new(event_type_txs),
766 default_tx,
767 };
768
769 let context_map_clone = context_map.clone();
771
772 let recovery_states: HashMap<String, EngineCheckpoint> = recovery_checkpoint
774 .map(|cp| cp.context_states.clone())
775 .unwrap_or_default();
776
777 for (ctx_name, config) in context_map.contexts() {
779 let rx = context_rxs
780 .remove(ctx_name)
781 .ok_or_else(|| format!("No receiver for context {ctx_name}"))?;
782
783 let ctx_output_tx = output_tx.clone();
784 let ctx_name_clone = ctx_name.clone();
785 let cores = config.cores.clone();
786
787 let all_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = context_txs
789 .iter()
790 .map(|(k, v)| (k.clone(), v.clone()))
791 .collect();
792
793 let filtered_program =
795 filter_program_for_context(program, ctx_name, &context_map_clone);
796 let ingress_routing_clone = ingress_routing.clone();
797 let shutdown_rx = shutdown_tx.subscribe();
798 let ctx_ack_tx = ack_tx.clone();
799 let ctx_recovery = recovery_states.get(ctx_name).cloned();
800
801 let handle = std::thread::Builder::new()
802 .name(format!("varpulis-ctx-{ctx_name}"))
803 .spawn(move || {
804 if let Some(ref core_ids) = cores {
806 Self::set_cpu_affinity(&ctx_name_clone, core_ids);
807 }
808
809 let rt = tokio::runtime::Builder::new_current_thread()
811 .enable_all()
812 .build()
813 .expect("Failed to create Tokio runtime for context");
814
815 rt.block_on(async move {
816 let (engine_output_tx, engine_output_rx) = mpsc::channel(1000);
818 let mut engine = Engine::new(engine_output_tx);
819 engine.set_context_name(&ctx_name_clone);
820 if let Err(e) = engine.load(&filtered_program) {
821 error!(
822 "Failed to load program for context '{}': {}",
823 ctx_name_clone, e
824 );
825 return;
826 }
827
828 if let Err(e) = engine.connect_sinks().await {
830 error!(
831 "Failed to connect sinks for context '{}': {}",
832 ctx_name_clone, e
833 );
834 return;
835 }
836
837 if let Some(cp) = ctx_recovery {
839 if let Err(e) = engine.restore_checkpoint(&cp) {
840 tracing::error!(
841 "Context {} failed to restore checkpoint: {}",
842 ctx_name_clone,
843 e
844 );
845 return;
846 }
847 }
848
849 let mut ctx_runtime = ContextRuntime::new(
850 ctx_name_clone,
851 engine,
852 ctx_output_tx,
853 rx,
854 engine_output_rx,
855 all_txs,
856 ingress_routing_clone,
857 shutdown_rx,
858 );
859
860 if let Some(ack_tx) = ctx_ack_tx {
861 ctx_runtime = ctx_runtime.with_ack_sender(ack_tx);
862 }
863
864 ctx_runtime.run().await;
865 });
866 })
867 .map_err(|e| format!("Failed to spawn context thread: {e}"))?;
868
869 handles.push(handle);
870 }
871
872 Ok(Self {
873 context_txs,
874 handles,
875 ingress_routing,
876 shutdown_tx,
877 router,
878 checkpoint_coordinator,
879 })
880 }
881
882 pub async fn process(&self, event: SharedEvent) -> Result<(), String> {
884 self.router.dispatch_await(event).await
885 }
886
887 pub fn try_process(&self, event: SharedEvent) -> Result<(), DispatchError> {
889 self.router.dispatch(event)
890 }
891
892 pub fn process_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
894 self.router.dispatch_batch(events)
895 }
896
897 pub fn router(&self) -> EventTypeRouter {
899 self.router.clone()
900 }
901
902 pub fn shutdown(self) {
906 let _ = self.shutdown_tx.send(true);
908
909 drop(self.context_txs);
911
912 for handle in self.handles {
914 if let Err(e) = handle.join() {
915 error!("Context thread panicked: {:?}", e);
916 }
917 }
918
919 info!("All context runtimes shut down");
920 }
921
922 pub fn trigger_checkpoint(&mut self) {
928 if let Some(ref mut coordinator) = self.checkpoint_coordinator {
929 coordinator.initiate(&self.context_txs);
930 }
931 }
932
933 pub fn try_complete_checkpoint(&mut self) -> Result<bool, StoreError> {
937 if let Some(ref mut coordinator) = self.checkpoint_coordinator {
938 let had_pending = coordinator.has_pending();
939 coordinator.try_complete()?;
940 Ok(had_pending && !coordinator.has_pending())
942 } else {
943 Ok(false)
944 }
945 }
946
947 pub fn should_checkpoint(&self) -> bool {
949 self.checkpoint_coordinator
950 .as_ref()
951 .is_some_and(|c| c.should_checkpoint())
952 }
953
954 pub fn checkpoint_tick(&mut self) -> Result<bool, StoreError> {
958 if self.should_checkpoint() {
959 self.trigger_checkpoint();
960 }
961 self.try_complete_checkpoint()
962 }
963
964 pub fn context_names(&self) -> Vec<&str> {
966 self.context_txs.keys().map(|s| s.as_str()).collect()
967 }
968
969 pub const fn ingress_routing(&self) -> &FxHashMap<String, String> {
971 &self.ingress_routing
972 }
973
974 fn event_types_from_source(source: &StreamSource) -> Vec<String> {
976 match source {
977 StreamSource::Ident(name) => vec![name.clone()],
978 StreamSource::IdentWithAlias { name, .. } => vec![name.clone()],
979 StreamSource::AllWithAlias { name, .. } => vec![name.clone()],
980 StreamSource::FromConnector { event_type, .. } => vec![event_type.clone()],
981 StreamSource::Merge(decls) => decls.iter().map(|d| d.source.clone()).collect(),
982 StreamSource::Join(clauses) => clauses.iter().map(|c| c.source.clone()).collect(),
983 StreamSource::Sequence(decl) => {
984 decl.steps.iter().map(|s| s.event_type.clone()).collect()
985 }
986 StreamSource::Timer(_) => vec![],
987 }
988 }
989
990 fn set_cpu_affinity(ctx_name: &str, core_ids: &[usize]) {
992 #[cfg(target_os = "linux")]
993 {
994 use core_affinity::CoreId;
995 if let Some(&first_core) = core_ids.first() {
996 let core_id = CoreId { id: first_core };
997 if core_affinity::set_for_current(core_id) {
998 info!("Context '{}' pinned to core {}", ctx_name, first_core);
999 } else {
1000 warn!(
1001 "Failed to pin context '{}' to core {}",
1002 ctx_name, first_core
1003 );
1004 }
1005 }
1006 }
1007
1008 #[cfg(not(target_os = "linux"))]
1009 {
1010 tracing::debug!(
1011 "CPU affinity not supported on this platform for context '{}' (cores: {:?})",
1012 ctx_name,
1013 core_ids
1014 );
1015 }
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022
1023 #[test]
1024 fn test_context_map_new() {
1025 let map = ContextMap::new();
1026 assert!(!map.has_contexts());
1027 assert!(map.contexts().is_empty());
1028 }
1029
1030 #[test]
1031 fn test_context_map_register() {
1032 let mut map = ContextMap::new();
1033 map.register_context(ContextConfig {
1034 name: "ingestion".to_string(),
1035 cores: Some(vec![0, 1]),
1036 });
1037 assert!(map.has_contexts());
1038 assert_eq!(map.contexts().len(), 1);
1039 let config = map.contexts().get("ingestion").unwrap();
1040 assert_eq!(config.cores, Some(vec![0, 1]));
1041 }
1042
1043 #[test]
1044 fn test_context_map_stream_assignment() {
1045 let mut map = ContextMap::new();
1046 map.register_context(ContextConfig {
1047 name: "fast".to_string(),
1048 cores: None,
1049 });
1050 map.assign_stream("RawEvents".to_string(), "fast".to_string());
1051 assert_eq!(map.stream_context("RawEvents"), Some("fast"));
1052 assert_eq!(map.stream_context("Unknown"), None);
1053 }
1054
1055 #[test]
1056 fn test_context_map_cross_context_emit() {
1057 let mut map = ContextMap::new();
1058 map.register_context(ContextConfig {
1059 name: "analytics".to_string(),
1060 cores: None,
1061 });
1062 map.add_cross_context_emit("Alerts".to_string(), 0, "analytics".to_string());
1063 let emits = map.cross_context_emits();
1064 assert_eq!(
1065 emits.get(&("Alerts".to_string(), 0)),
1066 Some(&"analytics".to_string())
1067 );
1068 }
1069
1070 #[test]
1071 fn test_no_context_backward_compat() {
1072 let map = ContextMap::new();
1073 assert!(!map.has_contexts());
1074 }
1075
1076 #[test]
1077 fn test_context_config_no_cores() {
1078 let config = ContextConfig {
1079 name: "test".to_string(),
1080 cores: None,
1081 };
1082 assert_eq!(config.name, "test");
1083 assert!(config.cores.is_none());
1084 }
1085
1086 #[test]
1087 fn test_context_map_multiple_contexts() {
1088 let mut map = ContextMap::new();
1089 map.register_context(ContextConfig {
1090 name: "ingestion".to_string(),
1091 cores: Some(vec![0, 1]),
1092 });
1093 map.register_context(ContextConfig {
1094 name: "analytics".to_string(),
1095 cores: Some(vec![2, 3]),
1096 });
1097 map.register_context(ContextConfig {
1098 name: "alerts".to_string(),
1099 cores: Some(vec![4]),
1100 });
1101
1102 assert_eq!(map.contexts().len(), 3);
1103
1104 map.assign_stream("RawEvents".to_string(), "ingestion".to_string());
1105 map.assign_stream("Analysis".to_string(), "analytics".to_string());
1106 map.assign_stream("Notifications".to_string(), "alerts".to_string());
1107
1108 assert_eq!(map.stream_context("RawEvents"), Some("ingestion"));
1109 assert_eq!(map.stream_context("Analysis"), Some("analytics"));
1110 assert_eq!(map.stream_context("Notifications"), Some("alerts"));
1111 }
1112
1113 #[test]
1114 fn test_context_orchestrator_event_types_from_source() {
1115 let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1116 "SensorReading".to_string(),
1117 ));
1118 assert_eq!(types, vec!["SensorReading"]);
1119
1120 let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1121 "ProcessedEvents".to_string(),
1122 ));
1123 assert_eq!(types, vec!["ProcessedEvents"]);
1124 }
1125
1126 #[test]
1127 fn test_filter_program_for_context() {
1128 use varpulis_core::span::Spanned;
1129
1130 let program = Program {
1131 statements: vec![
1132 Spanned {
1133 node: Stmt::ContextDecl {
1134 name: "ctx1".to_string(),
1135 cores: None,
1136 },
1137 span: varpulis_core::span::Span::dummy(),
1138 },
1139 Spanned {
1140 node: Stmt::ContextDecl {
1141 name: "ctx2".to_string(),
1142 cores: None,
1143 },
1144 span: varpulis_core::span::Span::dummy(),
1145 },
1146 Spanned {
1147 node: Stmt::StreamDecl {
1148 name: "StreamA".to_string(),
1149 type_annotation: None,
1150 source: StreamSource::Ident("EventA".to_string()),
1151 ops: vec![],
1152 op_spans: vec![],
1153 },
1154 span: varpulis_core::span::Span::dummy(),
1155 },
1156 Spanned {
1157 node: Stmt::StreamDecl {
1158 name: "StreamB".to_string(),
1159 type_annotation: None,
1160 source: StreamSource::Ident("EventB".to_string()),
1161 ops: vec![],
1162 op_spans: vec![],
1163 },
1164 span: varpulis_core::span::Span::dummy(),
1165 },
1166 ],
1167 };
1168
1169 let mut context_map = ContextMap::new();
1170 context_map.register_context(ContextConfig {
1171 name: "ctx1".to_string(),
1172 cores: None,
1173 });
1174 context_map.register_context(ContextConfig {
1175 name: "ctx2".to_string(),
1176 cores: None,
1177 });
1178 context_map.assign_stream("StreamA".to_string(), "ctx1".to_string());
1179 context_map.assign_stream("StreamB".to_string(), "ctx2".to_string());
1180
1181 let filtered = filter_program_for_context(&program, "ctx1", &context_map);
1182
1183 let stream_count = filtered
1184 .statements
1185 .iter()
1186 .filter(|s| matches!(s.node, Stmt::StreamDecl { .. }))
1187 .count();
1188 assert_eq!(stream_count, 1, "ctx1 should have exactly 1 stream");
1189
1190 let has_stream_a = filtered
1191 .statements
1192 .iter()
1193 .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamA"));
1194 assert!(has_stream_a, "ctx1 should contain StreamA");
1195
1196 let has_stream_b = filtered
1197 .statements
1198 .iter()
1199 .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamB"));
1200 assert!(!has_stream_b, "ctx1 should NOT contain StreamB");
1201
1202 let context_decl_count = filtered
1203 .statements
1204 .iter()
1205 .filter(|s| matches!(s.node, Stmt::ContextDecl { .. }))
1206 .count();
1207 assert_eq!(
1208 context_decl_count, 2,
1209 "All ContextDecls should be preserved"
1210 );
1211 }
1212
1213 #[test]
1214 fn test_ingress_routing_includes_derived_types() {
1215 use varpulis_core::span::Spanned;
1216
1217 let program = Program {
1218 statements: vec![
1219 Spanned {
1220 node: Stmt::ContextDecl {
1221 name: "ingest".to_string(),
1222 cores: None,
1223 },
1224 span: varpulis_core::span::Span::dummy(),
1225 },
1226 Spanned {
1227 node: Stmt::ContextDecl {
1228 name: "analytics".to_string(),
1229 cores: None,
1230 },
1231 span: varpulis_core::span::Span::dummy(),
1232 },
1233 Spanned {
1234 node: Stmt::StreamDecl {
1235 name: "RawData".to_string(),
1236 type_annotation: None,
1237 source: StreamSource::Ident("SensorReading".to_string()),
1238 ops: vec![],
1239 op_spans: vec![],
1240 },
1241 span: varpulis_core::span::Span::dummy(),
1242 },
1243 Spanned {
1244 node: Stmt::StreamDecl {
1245 name: "Analysis".to_string(),
1246 type_annotation: None,
1247 source: StreamSource::Ident("RawData".to_string()),
1248 ops: vec![],
1249 op_spans: vec![],
1250 },
1251 span: varpulis_core::span::Span::dummy(),
1252 },
1253 ],
1254 };
1255
1256 let mut context_map = ContextMap::new();
1257 context_map.register_context(ContextConfig {
1258 name: "ingest".to_string(),
1259 cores: None,
1260 });
1261 context_map.register_context(ContextConfig {
1262 name: "analytics".to_string(),
1263 cores: None,
1264 });
1265 context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1266 context_map.assign_stream("Analysis".to_string(), "analytics".to_string());
1267
1268 let (output_tx, _output_rx) = mpsc::channel(10);
1269 let orchestrator =
1270 ContextOrchestrator::build(&context_map, &program, output_tx, 100).unwrap();
1271
1272 let routing = orchestrator.ingress_routing();
1273
1274 assert_eq!(routing.get("SensorReading"), Some(&"ingest".to_string()));
1275 assert_eq!(routing.get("RawData"), Some(&"analytics".to_string()));
1276
1277 orchestrator.shutdown();
1278 }
1279
1280 #[test]
1281 fn test_ingress_routing_cross_context_emits() {
1282 let mut context_map = ContextMap::new();
1283 context_map.register_context(ContextConfig {
1284 name: "ingest".to_string(),
1285 cores: None,
1286 });
1287 context_map.register_context(ContextConfig {
1288 name: "analytics".to_string(),
1289 cores: None,
1290 });
1291 context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1292 context_map.add_cross_context_emit("RawData".to_string(), 0, "analytics".to_string());
1293
1294 let emits = context_map.cross_context_emits();
1295 assert_eq!(
1296 emits.get(&("RawData".to_string(), 0)),
1297 Some(&"analytics".to_string())
1298 );
1299 }
1300
1301 #[test]
1302 #[cfg(target_os = "linux")]
1303 fn test_cpu_affinity_verification() {
1304 let cores = verify_cpu_affinity();
1305 assert!(cores.is_some(), "Should be able to read CPU affinity");
1306 let cores = cores.unwrap();
1307 assert!(!cores.is_empty(), "Should have at least one allowed core");
1308 }
1309}