1use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Instant;
13
14use rustc_hash::FxHashMap;
15use tokio::sync::{mpsc, watch};
16use tracing::{error, info, warn};
17use varpulis_core::ast::{Program, Stmt, StreamSource};
18
19use crate::engine::Engine;
20use crate::event::{Event, SharedEvent};
21use crate::persistence::{CheckpointConfig, CheckpointManager, EngineCheckpoint, StoreError};
22
23#[derive(Debug, Clone)]
27pub enum ContextMessage {
28 Event(SharedEvent),
30 CheckpointBarrier(CheckpointBarrier),
32 WatermarkUpdate {
34 source_context: String,
35 watermark_ms: i64,
36 },
37}
38
39#[derive(Debug, Clone)]
41pub struct CheckpointBarrier {
42 pub checkpoint_id: u64,
43 pub timestamp_ms: i64,
44}
45
46#[derive(Debug)]
48pub struct CheckpointAck {
49 pub context_name: String,
50 pub checkpoint_id: u64,
51 pub engine_checkpoint: EngineCheckpoint,
52}
53
54struct PendingCheckpoint {
56 checkpoint_id: u64,
57 timestamp_ms: i64,
58 acks: HashMap<String, EngineCheckpoint>,
59 started_at: Instant,
60}
61
62pub struct CheckpointCoordinator {
67 manager: CheckpointManager,
68 ack_tx: mpsc::Sender<CheckpointAck>,
69 ack_rx: mpsc::Receiver<CheckpointAck>,
70 context_names: Vec<String>,
71 pending: Option<PendingCheckpoint>,
72 next_checkpoint_id: u64,
73}
74
75impl std::fmt::Debug for CheckpointCoordinator {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("CheckpointCoordinator")
78 .field("context_names", &self.context_names)
79 .field("next_checkpoint_id", &self.next_checkpoint_id)
80 .field("has_pending", &self.pending.is_some())
81 .finish_non_exhaustive()
82 }
83}
84
85impl CheckpointCoordinator {
86 pub fn new(manager: CheckpointManager, context_names: Vec<String>) -> Self {
88 let (ack_tx, ack_rx) = mpsc::channel(context_names.len() * 2);
89 Self {
90 manager,
91 ack_tx,
92 ack_rx,
93 context_names,
94 pending: None,
95 next_checkpoint_id: 1,
96 }
97 }
98
99 pub fn ack_sender(&self) -> mpsc::Sender<CheckpointAck> {
101 self.ack_tx.clone()
102 }
103
104 pub fn initiate(&mut self, context_txs: &FxHashMap<String, mpsc::Sender<ContextMessage>>) {
106 if self.pending.is_some() {
107 warn!("Checkpoint already in progress, skipping initiation");
108 return;
109 }
110
111 let checkpoint_id = self.next_checkpoint_id;
112 self.next_checkpoint_id += 1;
113 let timestamp_ms = chrono::Utc::now().timestamp_millis();
114
115 let barrier = CheckpointBarrier {
116 checkpoint_id,
117 timestamp_ms,
118 };
119
120 for (ctx_name, tx) in context_txs {
121 if let Err(e) = tx.try_send(ContextMessage::CheckpointBarrier(barrier.clone())) {
122 error!(
123 "Failed to send checkpoint barrier to context '{}': {}",
124 ctx_name, e
125 );
126 }
127 }
128
129 self.pending = Some(PendingCheckpoint {
130 checkpoint_id,
131 timestamp_ms,
132 acks: HashMap::new(),
133 started_at: Instant::now(),
134 });
135
136 info!("Initiated checkpoint {}", checkpoint_id);
137 }
138
139 pub fn receive_ack(&mut self, ack: CheckpointAck) -> Option<crate::persistence::Checkpoint> {
141 let pending = self.pending.as_mut()?;
142
143 if ack.checkpoint_id != pending.checkpoint_id {
144 warn!(
145 "Received ack for checkpoint {} but expecting {}",
146 ack.checkpoint_id, pending.checkpoint_id
147 );
148 return None;
149 }
150
151 pending.acks.insert(ack.context_name, ack.engine_checkpoint);
152
153 if pending.acks.len() == self.context_names.len() {
154 let pending = self.pending.take().unwrap();
155 let mut context_states = HashMap::new();
156 for (name, cp) in pending.acks {
157 context_states.insert(name, cp);
158 }
159
160 Some(crate::persistence::Checkpoint {
161 id: pending.checkpoint_id,
162 timestamp_ms: pending.timestamp_ms,
163 events_processed: 0, window_states: HashMap::new(),
165 pattern_states: HashMap::new(),
166 metadata: HashMap::new(),
167 context_states,
168 })
169 } else {
170 None
171 }
172 }
173
174 pub fn try_complete(&mut self) -> Result<(), StoreError> {
176 while let Ok(ack) = self.ack_rx.try_recv() {
177 if let Some(checkpoint) = self.receive_ack(ack) {
178 self.manager.checkpoint(checkpoint)?;
179 return Ok(());
180 }
181 }
182
183 if let Some(ref pending) = self.pending {
185 if pending.started_at.elapsed() > std::time::Duration::from_secs(30) {
186 warn!(
187 "Checkpoint {} has been pending for {:.1}s — contexts may be blocked",
188 pending.checkpoint_id,
189 pending.started_at.elapsed().as_secs_f64()
190 );
191 }
192 }
193
194 Ok(())
195 }
196
197 pub fn should_checkpoint(&self) -> bool {
199 self.pending.is_none() && self.manager.should_checkpoint()
200 }
201
202 pub const fn has_pending(&self) -> bool {
204 self.pending.is_some()
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct ContextConfig {
211 pub name: String,
212 pub cores: Option<Vec<usize>>,
213}
214
215#[derive(Debug, Clone, Default)]
220pub struct ContextMap {
221 contexts: HashMap<String, ContextConfig>,
223 stream_assignments: HashMap<String, String>,
225 cross_context_emits: HashMap<(String, usize), String>,
227}
228
229impl ContextMap {
230 pub fn new() -> Self {
231 Self::default()
232 }
233
234 pub fn register_context(&mut self, config: ContextConfig) {
236 self.contexts.insert(config.name.clone(), config);
237 }
238
239 pub fn assign_stream(&mut self, stream_name: String, context_name: String) {
241 self.stream_assignments.insert(stream_name, context_name);
242 }
243
244 pub fn add_cross_context_emit(
246 &mut self,
247 stream_name: String,
248 emit_index: usize,
249 target_context: String,
250 ) {
251 self.cross_context_emits
252 .insert((stream_name, emit_index), target_context);
253 }
254
255 pub fn has_contexts(&self) -> bool {
257 !self.contexts.is_empty()
258 }
259
260 pub const fn contexts(&self) -> &HashMap<String, ContextConfig> {
262 &self.contexts
263 }
264
265 pub fn stream_context(&self, stream_name: &str) -> Option<&str> {
267 self.stream_assignments.get(stream_name).map(|s| s.as_str())
268 }
269
270 pub const fn stream_assignments(&self) -> &HashMap<String, String> {
272 &self.stream_assignments
273 }
274
275 pub const fn cross_context_emits(&self) -> &HashMap<(String, usize), String> {
277 &self.cross_context_emits
278 }
279}
280
281pub fn filter_program_for_context(
288 program: &Program,
289 context_name: &str,
290 context_map: &ContextMap,
291) -> Program {
292 let filtered_statements = program
293 .statements
294 .iter()
295 .filter(|stmt| {
296 if let Stmt::StreamDecl { name, .. } = &stmt.node {
297 match context_map.stream_context(name) {
299 Some(ctx) => ctx == context_name,
300 None => true,
302 }
303 } else {
304 true
306 }
307 })
308 .cloned()
309 .collect();
310
311 Program {
312 statements: filtered_statements,
313 }
314}
315
316#[cfg(target_os = "linux")]
321pub fn verify_cpu_affinity() -> Option<Vec<usize>> {
322 use std::fs;
323
324 let status = fs::read_to_string("/proc/self/status").ok()?;
325 for line in status.lines() {
326 if line.starts_with("Cpus_allowed_list:") {
327 let list_str = line.split(':').nth(1)?.trim();
328 let mut cores = Vec::new();
329 for part in list_str.split(',') {
330 let part = part.trim();
331 if let Some((start, end)) = part.split_once('-') {
332 if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
333 cores.extend(s..=e);
334 }
335 } else if let Ok(core) = part.parse::<usize>() {
336 cores.push(core);
337 }
338 }
339 return Some(cores);
340 }
341 }
342 None
343}
344
345pub struct ContextRuntime {
350 name: String,
351 engine: Engine,
352 output_tx: mpsc::Sender<Event>,
354 event_rx: mpsc::Receiver<ContextMessage>,
356 engine_output_rx: mpsc::Receiver<Event>,
358 all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
360 ingress_routing: FxHashMap<String, String>,
362 shutdown_rx: watch::Receiver<bool>,
364 ack_tx: Option<mpsc::Sender<CheckpointAck>>,
366 events_processed: u64,
367 output_events_emitted: u64,
368}
369
370impl std::fmt::Debug for ContextRuntime {
371 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372 f.debug_struct("ContextRuntime")
373 .field("name", &self.name)
374 .field("ingress_routing", &self.ingress_routing)
375 .field("events_processed", &self.events_processed)
376 .field("output_events_emitted", &self.output_events_emitted)
377 .finish_non_exhaustive()
378 }
379}
380
381impl ContextRuntime {
382 #[allow(clippy::too_many_arguments)]
384 pub const fn new(
385 name: String,
386 engine: Engine,
387 output_tx: mpsc::Sender<Event>,
388 event_rx: mpsc::Receiver<ContextMessage>,
389 engine_output_rx: mpsc::Receiver<Event>,
390 all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
391 ingress_routing: FxHashMap<String, String>,
392 shutdown_rx: watch::Receiver<bool>,
393 ) -> Self {
394 Self {
395 name,
396 engine,
397 output_tx,
398 event_rx,
399 engine_output_rx,
400 all_context_txs,
401 ingress_routing,
402 shutdown_rx,
403 ack_tx: None,
404 events_processed: 0,
405 output_events_emitted: 0,
406 }
407 }
408
409 pub fn with_ack_sender(mut self, ack_tx: mpsc::Sender<CheckpointAck>) -> Self {
411 self.ack_tx = Some(ack_tx);
412 self
413 }
414
415 fn drain_and_route_output(&mut self) {
418 while let Ok(output_event) = self.engine_output_rx.try_recv() {
419 self.output_events_emitted += 1;
420
421 if let Some(target_ctx) = self.ingress_routing.get(&*output_event.event_type) {
423 if let Some(tx) = self.all_context_txs.get(target_ctx) {
424 let shared = Arc::new(output_event);
425 let _ = tx.try_send(ContextMessage::Event(Arc::clone(&shared)));
426 let owned = Arc::try_unwrap(shared).unwrap_or_else(|arc| (*arc).clone());
428 let _ = self.output_tx.try_send(owned);
429 continue;
430 }
431 }
432
433 let _ = self.output_tx.try_send(output_event);
435 }
436 }
437
438 async fn handle_checkpoint_barrier(&self, barrier: CheckpointBarrier) {
440 if let Some(ref ack_tx) = self.ack_tx {
441 let checkpoint = self.engine.create_checkpoint();
442 let _ = ack_tx
443 .send(CheckpointAck {
444 context_name: self.name.clone(),
445 checkpoint_id: barrier.checkpoint_id,
446 engine_checkpoint: checkpoint,
447 })
448 .await;
449 }
450 }
451
452 pub async fn run(&mut self) {
461 info!("Context '{}' runtime started", self.name);
462
463 #[cfg(target_os = "linux")]
464 if let Some(cores) = verify_cpu_affinity() {
465 info!("Context '{}' running on cores {:?}", self.name, cores);
466 }
467
468 let has_sessions = self.engine.has_session_windows();
470 let sweep_interval = self
471 .engine
472 .min_session_gap()
473 .and_then(|d| d.to_std().ok())
474 .unwrap_or(std::time::Duration::from_secs(60));
475
476 let mut sweep_timer = tokio::time::interval(sweep_interval);
477 sweep_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
478 sweep_timer.tick().await;
480
481 loop {
482 tokio::select! {
483 biased;
484
485 _ = self.shutdown_rx.changed() => {
486 if *self.shutdown_rx.borrow() {
487 info!("Context '{}' received shutdown signal", self.name);
488 if has_sessions {
490 if let Err(e) = self.engine.flush_expired_sessions().await {
491 error!("Context '{}' shutdown session flush error: {}", self.name, e);
492 }
493 self.drain_and_route_output();
494 }
495 break;
496 }
497 }
498
499 _ = sweep_timer.tick(), if has_sessions => {
500 match self.engine.flush_expired_sessions().await {
501 Ok(()) => {}
502 Err(e) => {
503 error!("Context '{}' session sweep error: {}", self.name, e);
504 }
505 }
506 self.drain_and_route_output();
507 }
508
509 msg = self.event_rx.recv() => {
510 match msg {
511 Some(ContextMessage::Event(event)) => {
512 self.events_processed += 1;
513
514 match self.engine.process_shared(Arc::clone(&event)).await {
516 Ok(()) => {}
517 Err(e) => {
518 error!("Context '{}' processing error: {}", self.name, e);
519 }
520 }
521
522 self.drain_and_route_output();
523 }
524 Some(ContextMessage::CheckpointBarrier(barrier)) => {
525 self.handle_checkpoint_barrier(barrier).await;
526 }
527 Some(ContextMessage::WatermarkUpdate { source_context, watermark_ms }) => {
528 let _ = self.engine.advance_external_watermark(&source_context, watermark_ms).await;
530 }
531 None => {
532 break;
534 }
535 }
536 }
537 }
538 }
539
540 self.all_context_txs.clear();
542
543 info!(
544 "Context '{}' runtime stopped (processed {} events, emitted {} output events)",
545 self.name, self.events_processed, self.output_events_emitted
546 );
547 }
548}
549
550#[derive(Clone)]
556pub struct EventTypeRouter {
557 routes: Arc<FxHashMap<String, mpsc::Sender<ContextMessage>>>,
558 default_tx: mpsc::Sender<ContextMessage>,
559}
560
561impl std::fmt::Debug for EventTypeRouter {
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 f.debug_struct("EventTypeRouter")
564 .field("route_count", &self.routes.len())
565 .field("route_keys", &self.routes.keys().collect::<Vec<_>>())
566 .finish_non_exhaustive()
567 }
568}
569
570#[derive(Debug)]
572pub enum DispatchError {
573 ChannelFull(ContextMessage),
575 ChannelClosed(ContextMessage),
577}
578
579impl EventTypeRouter {
580 pub fn dispatch(&self, event: SharedEvent) -> Result<(), DispatchError> {
585 let tx = self
586 .routes
587 .get(&*event.event_type)
588 .unwrap_or(&self.default_tx);
589 let msg = ContextMessage::Event(event);
590 match tx.try_send(msg) {
591 Ok(()) => Ok(()),
592 Err(mpsc::error::TrySendError::Full(msg)) => Err(DispatchError::ChannelFull(msg)),
593 Err(mpsc::error::TrySendError::Closed(msg)) => Err(DispatchError::ChannelClosed(msg)),
594 }
595 }
596
597 pub async fn dispatch_await(&self, event: SharedEvent) -> Result<(), String> {
601 let event_type = event.event_type.clone();
602 let tx = self.routes.get(&*event_type).unwrap_or(&self.default_tx);
603 tx.send(ContextMessage::Event(event))
604 .await
605 .map_err(|e| format!("Failed to send event type '{event_type}': {e}"))
606 }
607
608 pub fn dispatch_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
610 let mut errors = Vec::new();
611 for event in events {
612 if let Err(e) = self.dispatch(event) {
613 errors.push(e);
614 }
615 }
616 errors
617 }
618}
619
620pub struct ContextOrchestrator {
625 context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
627 handles: Vec<std::thread::JoinHandle<()>>,
629 ingress_routing: FxHashMap<String, String>,
631 shutdown_tx: watch::Sender<bool>,
633 router: EventTypeRouter,
635 checkpoint_coordinator: Option<CheckpointCoordinator>,
637}
638
639impl std::fmt::Debug for ContextOrchestrator {
640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 f.debug_struct("ContextOrchestrator")
642 .field("context_count", &self.context_txs.len())
643 .field("ingress_routing", &self.ingress_routing)
644 .field("handle_count", &self.handles.len())
645 .finish_non_exhaustive()
646 }
647}
648
649impl ContextOrchestrator {
650 pub fn build(
659 context_map: &ContextMap,
660 program: &Program,
661 output_tx: mpsc::Sender<Event>,
662 channel_capacity: usize,
663 ) -> Result<Self, String> {
664 Self::build_with_checkpoint(
665 context_map,
666 program,
667 output_tx,
668 channel_capacity,
669 None,
670 None,
671 )
672 }
673
674 pub fn build_with_checkpoint(
676 context_map: &ContextMap,
677 program: &Program,
678 output_tx: mpsc::Sender<Event>,
679 channel_capacity: usize,
680 checkpoint_config: Option<(CheckpointConfig, Arc<dyn crate::persistence::StateStore>)>,
681 recovery_checkpoint: Option<&crate::persistence::Checkpoint>,
682 ) -> Result<Self, String> {
683 let mut context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = FxHashMap::default();
684 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
685
686 let (shutdown_tx, _shutdown_rx) = watch::channel(false);
688
689 let default_context = context_map
691 .contexts()
692 .keys()
693 .next()
694 .cloned()
695 .unwrap_or_else(|| "default".to_string());
696
697 let mut context_rxs: FxHashMap<String, mpsc::Receiver<ContextMessage>> =
699 FxHashMap::default();
700 for ctx_name in context_map.contexts().keys() {
701 let (tx, rx) = mpsc::channel(channel_capacity);
702 context_txs.insert(ctx_name.clone(), tx);
703 context_rxs.insert(ctx_name.clone(), rx);
704 }
705
706 let context_names: Vec<String> = context_map.contexts().keys().cloned().collect();
708 let checkpoint_coordinator = checkpoint_config.map(|(config, store)| {
709 let manager = CheckpointManager::new(store, config)
710 .map_err(|e| format!("Failed to create checkpoint manager: {e}"))
711 .unwrap();
712 CheckpointCoordinator::new(manager, context_names.clone())
713 });
714 let ack_tx = checkpoint_coordinator.as_ref().map(|c| c.ack_sender());
715
716 let mut ingress_routing: FxHashMap<String, String> = FxHashMap::default();
718
719 for stmt in &program.statements {
721 if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
722 if let Some(ctx_name) = context_map.stream_context(name) {
723 let event_types = Self::event_types_from_source(source);
724 for et in event_types {
725 ingress_routing.insert(et, ctx_name.to_string());
726 }
727 }
728 }
729 }
730
731 for stmt in &program.statements {
733 if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
734 if let Some(ctx_name) = context_map.stream_context(name) {
735 if let StreamSource::Ident(source_stream) = source {
736 if context_map.stream_context(source_stream).is_some() {
737 ingress_routing.insert(source_stream.clone(), ctx_name.to_string());
738 }
739 }
740 }
741 }
742 }
743
744 for ((_stream_name, _emit_idx), target_ctx) in context_map.cross_context_emits() {
746 if !context_txs.contains_key(target_ctx) {
747 warn!(
748 "Cross-context emit targets unknown context '{}'",
749 target_ctx
750 );
751 }
752 }
753
754 let mut event_type_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> =
756 FxHashMap::default();
757 for (event_type, ctx_name) in &ingress_routing {
758 if let Some(tx) = context_txs.get(ctx_name) {
759 event_type_txs.insert(event_type.clone(), tx.clone());
760 }
761 }
762 let default_tx = context_txs
763 .get(&default_context)
764 .cloned()
765 .ok_or_else(|| format!("No channel for default context '{default_context}'"))?;
766 let router = EventTypeRouter {
767 routes: Arc::new(event_type_txs),
768 default_tx,
769 };
770
771 let context_map_clone = context_map.clone();
773
774 let recovery_states: HashMap<String, EngineCheckpoint> = recovery_checkpoint
776 .map(|cp| cp.context_states.clone())
777 .unwrap_or_default();
778
779 for (ctx_name, config) in context_map.contexts() {
781 let rx = context_rxs
782 .remove(ctx_name)
783 .ok_or_else(|| format!("No receiver for context {ctx_name}"))?;
784
785 let ctx_output_tx = output_tx.clone();
786 let ctx_name_clone = ctx_name.clone();
787 let cores = config.cores.clone();
788
789 let all_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = context_txs
791 .iter()
792 .map(|(k, v)| (k.clone(), v.clone()))
793 .collect();
794
795 let filtered_program =
797 filter_program_for_context(program, ctx_name, &context_map_clone);
798 let ingress_routing_clone = ingress_routing.clone();
799 let shutdown_rx = shutdown_tx.subscribe();
800 let ctx_ack_tx = ack_tx.clone();
801 let ctx_recovery = recovery_states.get(ctx_name).cloned();
802
803 let handle = std::thread::Builder::new()
804 .name(format!("varpulis-ctx-{ctx_name}"))
805 .spawn(move || {
806 if let Some(ref core_ids) = cores {
808 Self::set_cpu_affinity(&ctx_name_clone, core_ids);
809 }
810
811 let rt = tokio::runtime::Builder::new_current_thread()
813 .enable_all()
814 .build()
815 .expect("Failed to create Tokio runtime for context");
816
817 rt.block_on(async move {
818 let (engine_output_tx, engine_output_rx) = mpsc::channel(1000);
820 let mut engine = Engine::new(engine_output_tx);
821 engine.set_context_name(&ctx_name_clone);
822 if let Err(e) = engine.load(&filtered_program) {
823 error!(
824 "Failed to load program for context '{}': {}",
825 ctx_name_clone, e
826 );
827 return;
828 }
829
830 if let Err(e) = engine.connect_sinks().await {
832 error!(
833 "Failed to connect sinks for context '{}': {}",
834 ctx_name_clone, e
835 );
836 return;
837 }
838
839 if let Some(cp) = ctx_recovery {
841 if let Err(e) = engine.restore_checkpoint(&cp) {
842 tracing::error!(
843 "Context {} failed to restore checkpoint: {}",
844 ctx_name_clone,
845 e
846 );
847 return;
848 }
849 }
850
851 let mut ctx_runtime = ContextRuntime::new(
852 ctx_name_clone,
853 engine,
854 ctx_output_tx,
855 rx,
856 engine_output_rx,
857 all_txs,
858 ingress_routing_clone,
859 shutdown_rx,
860 );
861
862 if let Some(ack_tx) = ctx_ack_tx {
863 ctx_runtime = ctx_runtime.with_ack_sender(ack_tx);
864 }
865
866 ctx_runtime.run().await;
867 });
868 })
869 .map_err(|e| format!("Failed to spawn context thread: {e}"))?;
870
871 handles.push(handle);
872 }
873
874 Ok(Self {
875 context_txs,
876 handles,
877 ingress_routing,
878 shutdown_tx,
879 router,
880 checkpoint_coordinator,
881 })
882 }
883
884 pub async fn process(&self, event: SharedEvent) -> Result<(), String> {
886 self.router.dispatch_await(event).await
887 }
888
889 pub fn try_process(&self, event: SharedEvent) -> Result<(), DispatchError> {
891 self.router.dispatch(event)
892 }
893
894 pub fn process_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
896 self.router.dispatch_batch(events)
897 }
898
899 pub fn router(&self) -> EventTypeRouter {
901 self.router.clone()
902 }
903
904 pub fn shutdown(self) {
908 let _ = self.shutdown_tx.send(true);
910
911 drop(self.context_txs);
913
914 for handle in self.handles {
916 if let Err(e) = handle.join() {
917 error!("Context thread panicked: {:?}", e);
918 }
919 }
920
921 info!("All context runtimes shut down");
922 }
923
924 pub fn trigger_checkpoint(&mut self) {
930 if let Some(ref mut coordinator) = self.checkpoint_coordinator {
931 coordinator.initiate(&self.context_txs);
932 }
933 }
934
935 pub fn try_complete_checkpoint(&mut self) -> Result<bool, StoreError> {
939 if let Some(ref mut coordinator) = self.checkpoint_coordinator {
940 let had_pending = coordinator.has_pending();
941 coordinator.try_complete()?;
942 Ok(had_pending && !coordinator.has_pending())
944 } else {
945 Ok(false)
946 }
947 }
948
949 pub fn should_checkpoint(&self) -> bool {
951 self.checkpoint_coordinator
952 .as_ref()
953 .is_some_and(|c| c.should_checkpoint())
954 }
955
956 pub fn checkpoint_tick(&mut self) -> Result<bool, StoreError> {
960 if self.should_checkpoint() {
961 self.trigger_checkpoint();
962 }
963 self.try_complete_checkpoint()
964 }
965
966 pub fn context_names(&self) -> Vec<&str> {
968 self.context_txs.keys().map(|s| s.as_str()).collect()
969 }
970
971 pub const fn ingress_routing(&self) -> &FxHashMap<String, String> {
973 &self.ingress_routing
974 }
975
976 fn event_types_from_source(source: &StreamSource) -> Vec<String> {
978 match source {
979 StreamSource::Ident(name) => vec![name.clone()],
980 StreamSource::IdentWithAlias { name, .. } => vec![name.clone()],
981 StreamSource::AllWithAlias { name, .. } => vec![name.clone()],
982 StreamSource::FromConnector { event_type, .. } => vec![event_type.clone()],
983 StreamSource::Merge(decls) => decls.iter().map(|d| d.source.clone()).collect(),
984 StreamSource::Join(clauses) => clauses.iter().map(|c| c.source.clone()).collect(),
985 StreamSource::Sequence(decl) => {
986 decl.steps.iter().map(|s| s.event_type.clone()).collect()
987 }
988 StreamSource::Timer(_) => vec![],
989 }
990 }
991
992 fn set_cpu_affinity(ctx_name: &str, core_ids: &[usize]) {
994 #[cfg(target_os = "linux")]
995 {
996 use core_affinity::CoreId;
997 if let Some(&first_core) = core_ids.first() {
998 let core_id = CoreId { id: first_core };
999 if core_affinity::set_for_current(core_id) {
1000 info!("Context '{}' pinned to core {}", ctx_name, first_core);
1001 } else {
1002 warn!(
1003 "Failed to pin context '{}' to core {}",
1004 ctx_name, first_core
1005 );
1006 }
1007 }
1008 }
1009
1010 #[cfg(not(target_os = "linux"))]
1011 {
1012 tracing::debug!(
1013 "CPU affinity not supported on this platform for context '{}' (cores: {:?})",
1014 ctx_name,
1015 core_ids
1016 );
1017 }
1018 }
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 use super::*;
1024
1025 #[test]
1026 fn test_context_map_new() {
1027 let map = ContextMap::new();
1028 assert!(!map.has_contexts());
1029 assert!(map.contexts().is_empty());
1030 }
1031
1032 #[test]
1033 fn test_context_map_register() {
1034 let mut map = ContextMap::new();
1035 map.register_context(ContextConfig {
1036 name: "ingestion".to_string(),
1037 cores: Some(vec![0, 1]),
1038 });
1039 assert!(map.has_contexts());
1040 assert_eq!(map.contexts().len(), 1);
1041 let config = map.contexts().get("ingestion").unwrap();
1042 assert_eq!(config.cores, Some(vec![0, 1]));
1043 }
1044
1045 #[test]
1046 fn test_context_map_stream_assignment() {
1047 let mut map = ContextMap::new();
1048 map.register_context(ContextConfig {
1049 name: "fast".to_string(),
1050 cores: None,
1051 });
1052 map.assign_stream("RawEvents".to_string(), "fast".to_string());
1053 assert_eq!(map.stream_context("RawEvents"), Some("fast"));
1054 assert_eq!(map.stream_context("Unknown"), None);
1055 }
1056
1057 #[test]
1058 fn test_context_map_cross_context_emit() {
1059 let mut map = ContextMap::new();
1060 map.register_context(ContextConfig {
1061 name: "analytics".to_string(),
1062 cores: None,
1063 });
1064 map.add_cross_context_emit("Alerts".to_string(), 0, "analytics".to_string());
1065 let emits = map.cross_context_emits();
1066 assert_eq!(
1067 emits.get(&("Alerts".to_string(), 0)),
1068 Some(&"analytics".to_string())
1069 );
1070 }
1071
1072 #[test]
1073 fn test_no_context_backward_compat() {
1074 let map = ContextMap::new();
1075 assert!(!map.has_contexts());
1076 }
1077
1078 #[test]
1079 fn test_context_config_no_cores() {
1080 let config = ContextConfig {
1081 name: "test".to_string(),
1082 cores: None,
1083 };
1084 assert_eq!(config.name, "test");
1085 assert!(config.cores.is_none());
1086 }
1087
1088 #[test]
1089 fn test_context_map_multiple_contexts() {
1090 let mut map = ContextMap::new();
1091 map.register_context(ContextConfig {
1092 name: "ingestion".to_string(),
1093 cores: Some(vec![0, 1]),
1094 });
1095 map.register_context(ContextConfig {
1096 name: "analytics".to_string(),
1097 cores: Some(vec![2, 3]),
1098 });
1099 map.register_context(ContextConfig {
1100 name: "alerts".to_string(),
1101 cores: Some(vec![4]),
1102 });
1103
1104 assert_eq!(map.contexts().len(), 3);
1105
1106 map.assign_stream("RawEvents".to_string(), "ingestion".to_string());
1107 map.assign_stream("Analysis".to_string(), "analytics".to_string());
1108 map.assign_stream("Notifications".to_string(), "alerts".to_string());
1109
1110 assert_eq!(map.stream_context("RawEvents"), Some("ingestion"));
1111 assert_eq!(map.stream_context("Analysis"), Some("analytics"));
1112 assert_eq!(map.stream_context("Notifications"), Some("alerts"));
1113 }
1114
1115 #[test]
1116 fn test_context_orchestrator_event_types_from_source() {
1117 let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1118 "SensorReading".to_string(),
1119 ));
1120 assert_eq!(types, vec!["SensorReading"]);
1121
1122 let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1123 "ProcessedEvents".to_string(),
1124 ));
1125 assert_eq!(types, vec!["ProcessedEvents"]);
1126 }
1127
1128 #[test]
1129 fn test_filter_program_for_context() {
1130 use varpulis_core::span::Spanned;
1131
1132 let program = Program {
1133 statements: vec![
1134 Spanned {
1135 node: Stmt::ContextDecl {
1136 name: "ctx1".to_string(),
1137 cores: None,
1138 },
1139 span: varpulis_core::span::Span::dummy(),
1140 },
1141 Spanned {
1142 node: Stmt::ContextDecl {
1143 name: "ctx2".to_string(),
1144 cores: None,
1145 },
1146 span: varpulis_core::span::Span::dummy(),
1147 },
1148 Spanned {
1149 node: Stmt::StreamDecl {
1150 name: "StreamA".to_string(),
1151 type_annotation: None,
1152 source: StreamSource::Ident("EventA".to_string()),
1153 ops: vec![],
1154 op_spans: vec![],
1155 },
1156 span: varpulis_core::span::Span::dummy(),
1157 },
1158 Spanned {
1159 node: Stmt::StreamDecl {
1160 name: "StreamB".to_string(),
1161 type_annotation: None,
1162 source: StreamSource::Ident("EventB".to_string()),
1163 ops: vec![],
1164 op_spans: vec![],
1165 },
1166 span: varpulis_core::span::Span::dummy(),
1167 },
1168 ],
1169 };
1170
1171 let mut context_map = ContextMap::new();
1172 context_map.register_context(ContextConfig {
1173 name: "ctx1".to_string(),
1174 cores: None,
1175 });
1176 context_map.register_context(ContextConfig {
1177 name: "ctx2".to_string(),
1178 cores: None,
1179 });
1180 context_map.assign_stream("StreamA".to_string(), "ctx1".to_string());
1181 context_map.assign_stream("StreamB".to_string(), "ctx2".to_string());
1182
1183 let filtered = filter_program_for_context(&program, "ctx1", &context_map);
1184
1185 let stream_count = filtered
1186 .statements
1187 .iter()
1188 .filter(|s| matches!(s.node, Stmt::StreamDecl { .. }))
1189 .count();
1190 assert_eq!(stream_count, 1, "ctx1 should have exactly 1 stream");
1191
1192 let has_stream_a = filtered
1193 .statements
1194 .iter()
1195 .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamA"));
1196 assert!(has_stream_a, "ctx1 should contain StreamA");
1197
1198 let has_stream_b = filtered
1199 .statements
1200 .iter()
1201 .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamB"));
1202 assert!(!has_stream_b, "ctx1 should NOT contain StreamB");
1203
1204 let context_decl_count = filtered
1205 .statements
1206 .iter()
1207 .filter(|s| matches!(s.node, Stmt::ContextDecl { .. }))
1208 .count();
1209 assert_eq!(
1210 context_decl_count, 2,
1211 "All ContextDecls should be preserved"
1212 );
1213 }
1214
1215 #[test]
1216 fn test_ingress_routing_includes_derived_types() {
1217 use varpulis_core::span::Spanned;
1218
1219 let program = Program {
1220 statements: vec![
1221 Spanned {
1222 node: Stmt::ContextDecl {
1223 name: "ingest".to_string(),
1224 cores: None,
1225 },
1226 span: varpulis_core::span::Span::dummy(),
1227 },
1228 Spanned {
1229 node: Stmt::ContextDecl {
1230 name: "analytics".to_string(),
1231 cores: None,
1232 },
1233 span: varpulis_core::span::Span::dummy(),
1234 },
1235 Spanned {
1236 node: Stmt::StreamDecl {
1237 name: "RawData".to_string(),
1238 type_annotation: None,
1239 source: StreamSource::Ident("SensorReading".to_string()),
1240 ops: vec![],
1241 op_spans: vec![],
1242 },
1243 span: varpulis_core::span::Span::dummy(),
1244 },
1245 Spanned {
1246 node: Stmt::StreamDecl {
1247 name: "Analysis".to_string(),
1248 type_annotation: None,
1249 source: StreamSource::Ident("RawData".to_string()),
1250 ops: vec![],
1251 op_spans: vec![],
1252 },
1253 span: varpulis_core::span::Span::dummy(),
1254 },
1255 ],
1256 };
1257
1258 let mut context_map = ContextMap::new();
1259 context_map.register_context(ContextConfig {
1260 name: "ingest".to_string(),
1261 cores: None,
1262 });
1263 context_map.register_context(ContextConfig {
1264 name: "analytics".to_string(),
1265 cores: None,
1266 });
1267 context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1268 context_map.assign_stream("Analysis".to_string(), "analytics".to_string());
1269
1270 let (output_tx, _output_rx) = mpsc::channel(10);
1271 let orchestrator =
1272 ContextOrchestrator::build(&context_map, &program, output_tx, 100).unwrap();
1273
1274 let routing = orchestrator.ingress_routing();
1275
1276 assert_eq!(routing.get("SensorReading"), Some(&"ingest".to_string()));
1277 assert_eq!(routing.get("RawData"), Some(&"analytics".to_string()));
1278
1279 orchestrator.shutdown();
1280 }
1281
1282 #[test]
1283 fn test_ingress_routing_cross_context_emits() {
1284 let mut context_map = ContextMap::new();
1285 context_map.register_context(ContextConfig {
1286 name: "ingest".to_string(),
1287 cores: None,
1288 });
1289 context_map.register_context(ContextConfig {
1290 name: "analytics".to_string(),
1291 cores: None,
1292 });
1293 context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1294 context_map.add_cross_context_emit("RawData".to_string(), 0, "analytics".to_string());
1295
1296 let emits = context_map.cross_context_emits();
1297 assert_eq!(
1298 emits.get(&("RawData".to_string(), 0)),
1299 Some(&"analytics".to_string())
1300 );
1301 }
1302
1303 #[test]
1304 #[cfg(target_os = "linux")]
1305 fn test_cpu_affinity_verification() {
1306 let cores = verify_cpu_affinity();
1307 assert!(cores.is_some(), "Should be able to read CPU affinity");
1308 let cores = cores.unwrap();
1309 assert!(!cores.is_empty(), "Should have at least one allowed core");
1310 }
1311}