1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use rustc_hash::FxHashMap;
22use tokio::sync::{mpsc, watch};
23use tracing::{error, info, warn};
24use varpulis_core::ast::{Program, Stmt, StreamSource};
25
26use crate::engine::Engine;
27use crate::event::{Event, SharedEvent};
28use crate::persistence::{CheckpointConfig, CheckpointManager, EngineCheckpoint, StoreError};
29
30#[derive(Debug, Clone)]
34pub enum ContextMessage {
35 Event(SharedEvent),
37 CheckpointBarrier(CheckpointBarrier),
39 WatermarkUpdate {
41 source_context: String,
42 watermark_ms: i64,
43 },
44}
45
46#[derive(Debug, Clone)]
48pub struct CheckpointBarrier {
49 pub checkpoint_id: u64,
50 pub timestamp_ms: i64,
51}
52
53#[derive(Debug)]
55pub struct CheckpointAck {
56 pub context_name: String,
57 pub checkpoint_id: u64,
58 pub engine_checkpoint: EngineCheckpoint,
59}
60
61struct PendingCheckpoint {
63 checkpoint_id: u64,
64 timestamp_ms: i64,
65 acks: HashMap<String, EngineCheckpoint>,
66 started_at: Instant,
67}
68
69pub struct CheckpointCoordinator {
74 manager: CheckpointManager,
75 ack_tx: mpsc::Sender<CheckpointAck>,
76 ack_rx: mpsc::Receiver<CheckpointAck>,
77 context_names: Vec<String>,
78 pending: Option<PendingCheckpoint>,
79 next_checkpoint_id: u64,
80}
81
82impl std::fmt::Debug for CheckpointCoordinator {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 f.debug_struct("CheckpointCoordinator")
85 .field("context_names", &self.context_names)
86 .field("next_checkpoint_id", &self.next_checkpoint_id)
87 .field("has_pending", &self.pending.is_some())
88 .finish_non_exhaustive()
89 }
90}
91
92impl CheckpointCoordinator {
93 pub fn new(manager: CheckpointManager, context_names: Vec<String>) -> Self {
95 let (ack_tx, ack_rx) = mpsc::channel(context_names.len() * 2);
96 Self {
97 manager,
98 ack_tx,
99 ack_rx,
100 context_names,
101 pending: None,
102 next_checkpoint_id: 1,
103 }
104 }
105
106 pub fn ack_sender(&self) -> mpsc::Sender<CheckpointAck> {
108 self.ack_tx.clone()
109 }
110
111 pub fn initiate(&mut self, context_txs: &FxHashMap<String, mpsc::Sender<ContextMessage>>) {
113 if self.pending.is_some() {
114 warn!("Checkpoint already in progress, skipping initiation");
115 return;
116 }
117
118 let checkpoint_id = self.next_checkpoint_id;
119 self.next_checkpoint_id += 1;
120 let timestamp_ms = chrono::Utc::now().timestamp_millis();
121
122 let barrier = CheckpointBarrier {
123 checkpoint_id,
124 timestamp_ms,
125 };
126
127 for (ctx_name, tx) in context_txs {
128 if let Err(e) = tx.try_send(ContextMessage::CheckpointBarrier(barrier.clone())) {
129 error!(
130 "Failed to send checkpoint barrier to context '{}': {}",
131 ctx_name, e
132 );
133 }
134 }
135
136 self.pending = Some(PendingCheckpoint {
137 checkpoint_id,
138 timestamp_ms,
139 acks: HashMap::new(),
140 started_at: Instant::now(),
141 });
142
143 info!("Initiated checkpoint {}", checkpoint_id);
144 }
145
146 pub fn receive_ack(&mut self, ack: CheckpointAck) -> Option<crate::persistence::Checkpoint> {
148 let pending = self.pending.as_mut()?;
149
150 if ack.checkpoint_id != pending.checkpoint_id {
151 warn!(
152 "Received ack for checkpoint {} but expecting {}",
153 ack.checkpoint_id, pending.checkpoint_id
154 );
155 return None;
156 }
157
158 pending.acks.insert(ack.context_name, ack.engine_checkpoint);
159
160 if pending.acks.len() == self.context_names.len() {
161 let pending = self.pending.take().unwrap();
162 let mut context_states = HashMap::new();
163 for (name, cp) in pending.acks {
164 context_states.insert(name, cp);
165 }
166
167 Some(crate::persistence::Checkpoint {
168 id: pending.checkpoint_id,
169 timestamp_ms: pending.timestamp_ms,
170 events_processed: 0, window_states: HashMap::new(),
172 pattern_states: HashMap::new(),
173 metadata: HashMap::new(),
174 context_states,
175 })
176 } else {
177 None
178 }
179 }
180
181 pub fn try_complete(&mut self) -> Result<(), StoreError> {
183 while let Ok(ack) = self.ack_rx.try_recv() {
184 if let Some(checkpoint) = self.receive_ack(ack) {
185 self.manager.checkpoint(checkpoint)?;
186 return Ok(());
187 }
188 }
189
190 if let Some(ref pending) = self.pending {
192 if pending.started_at.elapsed() > std::time::Duration::from_secs(30) {
193 warn!(
194 "Checkpoint {} has been pending for {:.1}s — contexts may be blocked",
195 pending.checkpoint_id,
196 pending.started_at.elapsed().as_secs_f64()
197 );
198 }
199 }
200
201 Ok(())
202 }
203
204 pub fn should_checkpoint(&self) -> bool {
206 self.pending.is_none() && self.manager.should_checkpoint()
207 }
208
209 pub const fn has_pending(&self) -> bool {
211 self.pending.is_some()
212 }
213}
214
215#[derive(Debug, Clone)]
217pub struct ContextConfig {
218 pub name: String,
219 pub cores: Option<Vec<usize>>,
220}
221
222#[derive(Debug, Clone, Default)]
227pub struct ContextMap {
228 contexts: HashMap<String, ContextConfig>,
230 stream_assignments: HashMap<String, String>,
232 cross_context_emits: HashMap<(String, usize), String>,
234}
235
236impl ContextMap {
237 pub fn new() -> Self {
238 Self::default()
239 }
240
241 pub fn register_context(&mut self, config: ContextConfig) {
243 self.contexts.insert(config.name.clone(), config);
244 }
245
246 pub fn assign_stream(&mut self, stream_name: String, context_name: String) {
248 self.stream_assignments.insert(stream_name, context_name);
249 }
250
251 pub fn add_cross_context_emit(
253 &mut self,
254 stream_name: String,
255 emit_index: usize,
256 target_context: String,
257 ) {
258 self.cross_context_emits
259 .insert((stream_name, emit_index), target_context);
260 }
261
262 pub fn has_contexts(&self) -> bool {
264 !self.contexts.is_empty()
265 }
266
267 pub const fn contexts(&self) -> &HashMap<String, ContextConfig> {
269 &self.contexts
270 }
271
272 pub fn stream_context(&self, stream_name: &str) -> Option<&str> {
274 self.stream_assignments.get(stream_name).map(|s| s.as_str())
275 }
276
277 pub const fn stream_assignments(&self) -> &HashMap<String, String> {
279 &self.stream_assignments
280 }
281
282 pub const fn cross_context_emits(&self) -> &HashMap<(String, usize), String> {
284 &self.cross_context_emits
285 }
286}
287
288pub fn filter_program_for_context(
295 program: &Program,
296 context_name: &str,
297 context_map: &ContextMap,
298) -> Program {
299 let filtered_statements = program
300 .statements
301 .iter()
302 .filter(|stmt| {
303 if let Stmt::StreamDecl { name, .. } = &stmt.node {
304 match context_map.stream_context(name) {
306 Some(ctx) => ctx == context_name,
307 None => true,
309 }
310 } else {
311 true
313 }
314 })
315 .cloned()
316 .collect();
317
318 Program {
319 statements: filtered_statements,
320 }
321}
322
323#[cfg(target_os = "linux")]
328pub fn verify_cpu_affinity() -> Option<Vec<usize>> {
329 use std::fs;
330
331 let status = fs::read_to_string("/proc/self/status").ok()?;
332 for line in status.lines() {
333 if line.starts_with("Cpus_allowed_list:") {
334 let list_str = line.split(':').nth(1)?.trim();
335 let mut cores = Vec::new();
336 for part in list_str.split(',') {
337 let part = part.trim();
338 if let Some((start, end)) = part.split_once('-') {
339 if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
340 cores.extend(s..=e);
341 }
342 } else if let Ok(core) = part.parse::<usize>() {
343 cores.push(core);
344 }
345 }
346 return Some(cores);
347 }
348 }
349 None
350}
351
352pub struct ContextRuntime {
357 name: String,
358 engine: Engine,
359 output_tx: mpsc::Sender<Event>,
361 event_rx: mpsc::Receiver<ContextMessage>,
363 engine_output_rx: mpsc::Receiver<Event>,
365 all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
367 ingress_routing: FxHashMap<String, String>,
369 shutdown_rx: watch::Receiver<bool>,
371 ack_tx: Option<mpsc::Sender<CheckpointAck>>,
373 events_processed: u64,
374 output_events_emitted: u64,
375}
376
377impl std::fmt::Debug for ContextRuntime {
378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379 f.debug_struct("ContextRuntime")
380 .field("name", &self.name)
381 .field("ingress_routing", &self.ingress_routing)
382 .field("events_processed", &self.events_processed)
383 .field("output_events_emitted", &self.output_events_emitted)
384 .finish_non_exhaustive()
385 }
386}
387
388impl ContextRuntime {
389 #[allow(clippy::too_many_arguments)]
391 pub const fn new(
392 name: String,
393 engine: Engine,
394 output_tx: mpsc::Sender<Event>,
395 event_rx: mpsc::Receiver<ContextMessage>,
396 engine_output_rx: mpsc::Receiver<Event>,
397 all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
398 ingress_routing: FxHashMap<String, String>,
399 shutdown_rx: watch::Receiver<bool>,
400 ) -> Self {
401 Self {
402 name,
403 engine,
404 output_tx,
405 event_rx,
406 engine_output_rx,
407 all_context_txs,
408 ingress_routing,
409 shutdown_rx,
410 ack_tx: None,
411 events_processed: 0,
412 output_events_emitted: 0,
413 }
414 }
415
416 pub fn with_ack_sender(mut self, ack_tx: mpsc::Sender<CheckpointAck>) -> Self {
418 self.ack_tx = Some(ack_tx);
419 self
420 }
421
422 fn drain_and_route_output(&mut self) {
425 while let Ok(output_event) = self.engine_output_rx.try_recv() {
426 self.output_events_emitted += 1;
427
428 if let Some(target_ctx) = self.ingress_routing.get(&*output_event.event_type) {
430 if let Some(tx) = self.all_context_txs.get(target_ctx) {
431 let shared = Arc::new(output_event);
432 let _ = tx.try_send(ContextMessage::Event(Arc::clone(&shared)));
433 let owned = Arc::try_unwrap(shared).unwrap_or_else(|arc| (*arc).clone());
435 let _ = self.output_tx.try_send(owned);
436 continue;
437 }
438 }
439
440 let _ = self.output_tx.try_send(output_event);
442 }
443 }
444
445 async fn handle_checkpoint_barrier(&self, barrier: CheckpointBarrier) {
447 if let Some(ref ack_tx) = self.ack_tx {
448 let checkpoint = self.engine.create_checkpoint();
449 let _ = ack_tx
450 .send(CheckpointAck {
451 context_name: self.name.clone(),
452 checkpoint_id: barrier.checkpoint_id,
453 engine_checkpoint: checkpoint,
454 })
455 .await;
456 }
457 }
458
459 pub async fn run(&mut self) {
468 info!("Context '{}' runtime started", self.name);
469
470 #[cfg(target_os = "linux")]
471 if let Some(cores) = verify_cpu_affinity() {
472 info!("Context '{}' running on cores {:?}", self.name, cores);
473 }
474
475 let has_sessions = self.engine.has_session_windows();
477 let sweep_interval = self
478 .engine
479 .min_session_gap()
480 .and_then(|d| d.to_std().ok())
481 .unwrap_or(std::time::Duration::from_secs(60));
482
483 let mut sweep_timer = tokio::time::interval(sweep_interval);
484 sweep_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
485 sweep_timer.tick().await;
487
488 loop {
489 tokio::select! {
490 biased;
491
492 _ = self.shutdown_rx.changed() => {
493 if *self.shutdown_rx.borrow() {
494 info!("Context '{}' received shutdown signal", self.name);
495 if has_sessions {
497 if let Err(e) = self.engine.flush_expired_sessions().await {
498 error!("Context '{}' shutdown session flush error: {}", self.name, e);
499 }
500 self.drain_and_route_output();
501 }
502 break;
503 }
504 }
505
506 _ = sweep_timer.tick(), if has_sessions => {
507 match self.engine.flush_expired_sessions().await {
508 Ok(()) => {}
509 Err(e) => {
510 error!("Context '{}' session sweep error: {}", self.name, e);
511 }
512 }
513 self.drain_and_route_output();
514 }
515
516 msg = self.event_rx.recv() => {
517 match msg {
518 Some(ContextMessage::Event(event)) => {
519 self.events_processed += 1;
520
521 match self.engine.process_shared(Arc::clone(&event)).await {
523 Ok(()) => {}
524 Err(e) => {
525 error!("Context '{}' processing error: {}", self.name, e);
526 }
527 }
528
529 self.drain_and_route_output();
530 }
531 Some(ContextMessage::CheckpointBarrier(barrier)) => {
532 self.handle_checkpoint_barrier(barrier).await;
533 }
534 Some(ContextMessage::WatermarkUpdate { source_context, watermark_ms }) => {
535 let _ = self.engine.advance_external_watermark(&source_context, watermark_ms).await;
537 }
538 None => {
539 break;
541 }
542 }
543 }
544 }
545 }
546
547 self.all_context_txs.clear();
549
550 info!(
551 "Context '{}' runtime stopped (processed {} events, emitted {} output events)",
552 self.name, self.events_processed, self.output_events_emitted
553 );
554 }
555}
556
557#[derive(Clone)]
563pub struct EventTypeRouter {
564 routes: Arc<FxHashMap<String, mpsc::Sender<ContextMessage>>>,
565 default_tx: mpsc::Sender<ContextMessage>,
566}
567
568impl std::fmt::Debug for EventTypeRouter {
569 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570 f.debug_struct("EventTypeRouter")
571 .field("route_count", &self.routes.len())
572 .field("route_keys", &self.routes.keys().collect::<Vec<_>>())
573 .finish_non_exhaustive()
574 }
575}
576
577#[derive(Debug)]
579pub enum DispatchError {
580 ChannelFull(ContextMessage),
582 ChannelClosed(ContextMessage),
584}
585
586impl EventTypeRouter {
587 pub fn dispatch(&self, event: SharedEvent) -> Result<(), DispatchError> {
592 let tx = self
593 .routes
594 .get(&*event.event_type)
595 .unwrap_or(&self.default_tx);
596 let msg = ContextMessage::Event(event);
597 match tx.try_send(msg) {
598 Ok(()) => Ok(()),
599 Err(mpsc::error::TrySendError::Full(msg)) => Err(DispatchError::ChannelFull(msg)),
600 Err(mpsc::error::TrySendError::Closed(msg)) => Err(DispatchError::ChannelClosed(msg)),
601 }
602 }
603
604 pub async fn dispatch_await(&self, event: SharedEvent) -> Result<(), String> {
608 let event_type = event.event_type.clone();
609 let tx = self.routes.get(&*event_type).unwrap_or(&self.default_tx);
610 tx.send(ContextMessage::Event(event))
611 .await
612 .map_err(|e| format!("Failed to send event type '{event_type}': {e}"))
613 }
614
615 pub fn dispatch_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
617 let mut errors = Vec::new();
618 for event in events {
619 if let Err(e) = self.dispatch(event) {
620 errors.push(e);
621 }
622 }
623 errors
624 }
625}
626
627pub struct ContextOrchestrator {
632 context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
634 handles: Vec<std::thread::JoinHandle<()>>,
636 ingress_routing: FxHashMap<String, String>,
638 shutdown_tx: watch::Sender<bool>,
640 router: EventTypeRouter,
642 checkpoint_coordinator: Option<CheckpointCoordinator>,
644}
645
646impl std::fmt::Debug for ContextOrchestrator {
647 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648 f.debug_struct("ContextOrchestrator")
649 .field("context_count", &self.context_txs.len())
650 .field("ingress_routing", &self.ingress_routing)
651 .field("handle_count", &self.handles.len())
652 .finish_non_exhaustive()
653 }
654}
655
656impl ContextOrchestrator {
657 pub fn build(
666 context_map: &ContextMap,
667 program: &Program,
668 output_tx: mpsc::Sender<Event>,
669 channel_capacity: usize,
670 ) -> Result<Self, String> {
671 Self::build_with_checkpoint(
672 context_map,
673 program,
674 output_tx,
675 channel_capacity,
676 None,
677 None,
678 )
679 }
680
681 pub fn build_with_checkpoint(
683 context_map: &ContextMap,
684 program: &Program,
685 output_tx: mpsc::Sender<Event>,
686 channel_capacity: usize,
687 checkpoint_config: Option<(CheckpointConfig, Arc<dyn crate::persistence::StateStore>)>,
688 recovery_checkpoint: Option<&crate::persistence::Checkpoint>,
689 ) -> Result<Self, String> {
690 let mut context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = FxHashMap::default();
691 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
692
693 let (shutdown_tx, _shutdown_rx) = watch::channel(false);
695
696 let default_context = context_map
698 .contexts()
699 .keys()
700 .next()
701 .cloned()
702 .unwrap_or_else(|| "default".to_string());
703
704 let mut context_rxs: FxHashMap<String, mpsc::Receiver<ContextMessage>> =
706 FxHashMap::default();
707 for ctx_name in context_map.contexts().keys() {
708 let (tx, rx) = mpsc::channel(channel_capacity);
709 context_txs.insert(ctx_name.clone(), tx);
710 context_rxs.insert(ctx_name.clone(), rx);
711 }
712
713 let context_names: Vec<String> = context_map.contexts().keys().cloned().collect();
715 let checkpoint_coordinator = checkpoint_config.map(|(config, store)| {
716 let manager = CheckpointManager::new(store, config)
717 .map_err(|e| format!("Failed to create checkpoint manager: {e}"))
718 .unwrap();
719 CheckpointCoordinator::new(manager, context_names.clone())
720 });
721 let ack_tx = checkpoint_coordinator.as_ref().map(|c| c.ack_sender());
722
723 let mut ingress_routing: FxHashMap<String, String> = FxHashMap::default();
725
726 for stmt in &program.statements {
728 if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
729 if let Some(ctx_name) = context_map.stream_context(name) {
730 let event_types = Self::event_types_from_source(source);
731 for et in event_types {
732 ingress_routing.insert(et, ctx_name.to_string());
733 }
734 }
735 }
736 }
737
738 for stmt in &program.statements {
740 if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
741 if let Some(ctx_name) = context_map.stream_context(name) {
742 if let StreamSource::Ident(source_stream) = source {
743 if context_map.stream_context(source_stream).is_some() {
744 ingress_routing.insert(source_stream.clone(), ctx_name.to_string());
745 }
746 }
747 }
748 }
749 }
750
751 for ((_stream_name, _emit_idx), target_ctx) in context_map.cross_context_emits() {
753 if !context_txs.contains_key(target_ctx) {
754 warn!(
755 "Cross-context emit targets unknown context '{}'",
756 target_ctx
757 );
758 }
759 }
760
761 let mut event_type_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> =
763 FxHashMap::default();
764 for (event_type, ctx_name) in &ingress_routing {
765 if let Some(tx) = context_txs.get(ctx_name) {
766 event_type_txs.insert(event_type.clone(), tx.clone());
767 }
768 }
769 let default_tx = context_txs
770 .get(&default_context)
771 .cloned()
772 .ok_or_else(|| format!("No channel for default context '{default_context}'"))?;
773 let router = EventTypeRouter {
774 routes: Arc::new(event_type_txs),
775 default_tx,
776 };
777
778 let context_map_clone = context_map.clone();
780
781 let recovery_states: HashMap<String, EngineCheckpoint> = recovery_checkpoint
783 .map(|cp| cp.context_states.clone())
784 .unwrap_or_default();
785
786 for (ctx_name, config) in context_map.contexts() {
788 let rx = context_rxs
789 .remove(ctx_name)
790 .ok_or_else(|| format!("No receiver for context {ctx_name}"))?;
791
792 let ctx_output_tx = output_tx.clone();
793 let ctx_name_clone = ctx_name.clone();
794 let cores = config.cores.clone();
795
796 let all_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = context_txs
798 .iter()
799 .map(|(k, v)| (k.clone(), v.clone()))
800 .collect();
801
802 let filtered_program =
804 filter_program_for_context(program, ctx_name, &context_map_clone);
805 let ingress_routing_clone = ingress_routing.clone();
806 let shutdown_rx = shutdown_tx.subscribe();
807 let ctx_ack_tx = ack_tx.clone();
808 let ctx_recovery = recovery_states.get(ctx_name).cloned();
809
810 let handle = std::thread::Builder::new()
811 .name(format!("varpulis-ctx-{ctx_name}"))
812 .spawn(move || {
813 if let Some(ref core_ids) = cores {
815 Self::set_cpu_affinity(&ctx_name_clone, core_ids);
816 }
817
818 let rt = tokio::runtime::Builder::new_current_thread()
820 .enable_all()
821 .build()
822 .expect("Failed to create Tokio runtime for context");
823
824 rt.block_on(async move {
825 let (engine_output_tx, engine_output_rx) = mpsc::channel(1000);
827 let mut engine = Engine::new(engine_output_tx);
828 engine.set_context_name(&ctx_name_clone);
829 if let Err(e) = engine.load(&filtered_program) {
830 error!(
831 "Failed to load program for context '{}': {}",
832 ctx_name_clone, e
833 );
834 return;
835 }
836
837 if let Err(e) = engine.connect_sinks().await {
839 error!(
840 "Failed to connect sinks for context '{}': {}",
841 ctx_name_clone, e
842 );
843 return;
844 }
845
846 if let Some(cp) = ctx_recovery {
848 if let Err(e) = engine.restore_checkpoint(&cp) {
849 tracing::error!(
850 "Context {} failed to restore checkpoint: {}",
851 ctx_name_clone,
852 e
853 );
854 return;
855 }
856 }
857
858 let mut ctx_runtime = ContextRuntime::new(
859 ctx_name_clone,
860 engine,
861 ctx_output_tx,
862 rx,
863 engine_output_rx,
864 all_txs,
865 ingress_routing_clone,
866 shutdown_rx,
867 );
868
869 if let Some(ack_tx) = ctx_ack_tx {
870 ctx_runtime = ctx_runtime.with_ack_sender(ack_tx);
871 }
872
873 ctx_runtime.run().await;
874 });
875 })
876 .map_err(|e| format!("Failed to spawn context thread: {e}"))?;
877
878 handles.push(handle);
879 }
880
881 Ok(Self {
882 context_txs,
883 handles,
884 ingress_routing,
885 shutdown_tx,
886 router,
887 checkpoint_coordinator,
888 })
889 }
890
891 pub async fn process(&self, event: SharedEvent) -> Result<(), String> {
893 self.router.dispatch_await(event).await
894 }
895
896 pub fn try_process(&self, event: SharedEvent) -> Result<(), DispatchError> {
898 self.router.dispatch(event)
899 }
900
901 pub fn process_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
903 self.router.dispatch_batch(events)
904 }
905
906 pub fn router(&self) -> EventTypeRouter {
908 self.router.clone()
909 }
910
911 pub fn shutdown(self) {
915 let _ = self.shutdown_tx.send(true);
917
918 drop(self.context_txs);
920
921 for handle in self.handles {
923 if let Err(e) = handle.join() {
924 error!("Context thread panicked: {:?}", e);
925 }
926 }
927
928 info!("All context runtimes shut down");
929 }
930
931 pub fn trigger_checkpoint(&mut self) {
937 if let Some(ref mut coordinator) = self.checkpoint_coordinator {
938 coordinator.initiate(&self.context_txs);
939 }
940 }
941
942 pub fn try_complete_checkpoint(&mut self) -> Result<bool, StoreError> {
946 if let Some(ref mut coordinator) = self.checkpoint_coordinator {
947 let had_pending = coordinator.has_pending();
948 coordinator.try_complete()?;
949 Ok(had_pending && !coordinator.has_pending())
951 } else {
952 Ok(false)
953 }
954 }
955
956 pub fn should_checkpoint(&self) -> bool {
958 self.checkpoint_coordinator
959 .as_ref()
960 .is_some_and(|c| c.should_checkpoint())
961 }
962
963 pub fn checkpoint_tick(&mut self) -> Result<bool, StoreError> {
967 if self.should_checkpoint() {
968 self.trigger_checkpoint();
969 }
970 self.try_complete_checkpoint()
971 }
972
973 pub fn context_names(&self) -> Vec<&str> {
975 self.context_txs.keys().map(|s| s.as_str()).collect()
976 }
977
978 pub const fn ingress_routing(&self) -> &FxHashMap<String, String> {
980 &self.ingress_routing
981 }
982
983 fn event_types_from_source(source: &StreamSource) -> Vec<String> {
985 match source {
986 StreamSource::Ident(name) => vec![name.clone()],
987 StreamSource::IdentWithAlias { name, .. } => vec![name.clone()],
988 StreamSource::AllWithAlias { name, .. } => vec![name.clone()],
989 StreamSource::FromConnector { event_type, .. } => vec![event_type.clone()],
990 StreamSource::Merge(decls) => decls.iter().map(|d| d.source.clone()).collect(),
991 StreamSource::Join(clauses) => clauses.iter().map(|c| c.source.clone()).collect(),
992 StreamSource::Sequence(decl) => {
993 decl.steps.iter().map(|s| s.event_type.clone()).collect()
994 }
995 StreamSource::Timer(_) => vec![],
996 }
997 }
998
999 fn set_cpu_affinity(ctx_name: &str, core_ids: &[usize]) {
1001 #[cfg(target_os = "linux")]
1002 {
1003 use core_affinity::CoreId;
1004 if let Some(&first_core) = core_ids.first() {
1005 let core_id = CoreId { id: first_core };
1006 if core_affinity::set_for_current(core_id) {
1007 info!("Context '{}' pinned to core {}", ctx_name, first_core);
1008 } else {
1009 warn!(
1010 "Failed to pin context '{}' to core {}",
1011 ctx_name, first_core
1012 );
1013 }
1014 }
1015 }
1016
1017 #[cfg(not(target_os = "linux"))]
1018 {
1019 tracing::debug!(
1020 "CPU affinity not supported on this platform for context '{}' (cores: {:?})",
1021 ctx_name,
1022 core_ids
1023 );
1024 }
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use super::*;
1031
1032 #[test]
1033 fn test_context_map_new() {
1034 let map = ContextMap::new();
1035 assert!(!map.has_contexts());
1036 assert!(map.contexts().is_empty());
1037 }
1038
1039 #[test]
1040 fn test_context_map_register() {
1041 let mut map = ContextMap::new();
1042 map.register_context(ContextConfig {
1043 name: "ingestion".to_string(),
1044 cores: Some(vec![0, 1]),
1045 });
1046 assert!(map.has_contexts());
1047 assert_eq!(map.contexts().len(), 1);
1048 let config = map.contexts().get("ingestion").unwrap();
1049 assert_eq!(config.cores, Some(vec![0, 1]));
1050 }
1051
1052 #[test]
1053 fn test_context_map_stream_assignment() {
1054 let mut map = ContextMap::new();
1055 map.register_context(ContextConfig {
1056 name: "fast".to_string(),
1057 cores: None,
1058 });
1059 map.assign_stream("RawEvents".to_string(), "fast".to_string());
1060 assert_eq!(map.stream_context("RawEvents"), Some("fast"));
1061 assert_eq!(map.stream_context("Unknown"), None);
1062 }
1063
1064 #[test]
1065 fn test_context_map_cross_context_emit() {
1066 let mut map = ContextMap::new();
1067 map.register_context(ContextConfig {
1068 name: "analytics".to_string(),
1069 cores: None,
1070 });
1071 map.add_cross_context_emit("Alerts".to_string(), 0, "analytics".to_string());
1072 let emits = map.cross_context_emits();
1073 assert_eq!(
1074 emits.get(&("Alerts".to_string(), 0)),
1075 Some(&"analytics".to_string())
1076 );
1077 }
1078
1079 #[test]
1080 fn test_no_context_backward_compat() {
1081 let map = ContextMap::new();
1082 assert!(!map.has_contexts());
1083 }
1084
1085 #[test]
1086 fn test_context_config_no_cores() {
1087 let config = ContextConfig {
1088 name: "test".to_string(),
1089 cores: None,
1090 };
1091 assert_eq!(config.name, "test");
1092 assert!(config.cores.is_none());
1093 }
1094
1095 #[test]
1096 fn test_context_map_multiple_contexts() {
1097 let mut map = ContextMap::new();
1098 map.register_context(ContextConfig {
1099 name: "ingestion".to_string(),
1100 cores: Some(vec![0, 1]),
1101 });
1102 map.register_context(ContextConfig {
1103 name: "analytics".to_string(),
1104 cores: Some(vec![2, 3]),
1105 });
1106 map.register_context(ContextConfig {
1107 name: "alerts".to_string(),
1108 cores: Some(vec![4]),
1109 });
1110
1111 assert_eq!(map.contexts().len(), 3);
1112
1113 map.assign_stream("RawEvents".to_string(), "ingestion".to_string());
1114 map.assign_stream("Analysis".to_string(), "analytics".to_string());
1115 map.assign_stream("Notifications".to_string(), "alerts".to_string());
1116
1117 assert_eq!(map.stream_context("RawEvents"), Some("ingestion"));
1118 assert_eq!(map.stream_context("Analysis"), Some("analytics"));
1119 assert_eq!(map.stream_context("Notifications"), Some("alerts"));
1120 }
1121
1122 #[test]
1123 fn test_context_orchestrator_event_types_from_source() {
1124 let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1125 "SensorReading".to_string(),
1126 ));
1127 assert_eq!(types, vec!["SensorReading"]);
1128
1129 let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1130 "ProcessedEvents".to_string(),
1131 ));
1132 assert_eq!(types, vec!["ProcessedEvents"]);
1133 }
1134
1135 #[test]
1136 fn test_filter_program_for_context() {
1137 use varpulis_core::span::Spanned;
1138
1139 let program = Program {
1140 statements: vec![
1141 Spanned {
1142 node: Stmt::ContextDecl {
1143 name: "ctx1".to_string(),
1144 cores: None,
1145 },
1146 span: varpulis_core::span::Span::dummy(),
1147 },
1148 Spanned {
1149 node: Stmt::ContextDecl {
1150 name: "ctx2".to_string(),
1151 cores: None,
1152 },
1153 span: varpulis_core::span::Span::dummy(),
1154 },
1155 Spanned {
1156 node: Stmt::StreamDecl {
1157 name: "StreamA".to_string(),
1158 type_annotation: None,
1159 source: StreamSource::Ident("EventA".to_string()),
1160 ops: vec![],
1161 op_spans: vec![],
1162 },
1163 span: varpulis_core::span::Span::dummy(),
1164 },
1165 Spanned {
1166 node: Stmt::StreamDecl {
1167 name: "StreamB".to_string(),
1168 type_annotation: None,
1169 source: StreamSource::Ident("EventB".to_string()),
1170 ops: vec![],
1171 op_spans: vec![],
1172 },
1173 span: varpulis_core::span::Span::dummy(),
1174 },
1175 ],
1176 };
1177
1178 let mut context_map = ContextMap::new();
1179 context_map.register_context(ContextConfig {
1180 name: "ctx1".to_string(),
1181 cores: None,
1182 });
1183 context_map.register_context(ContextConfig {
1184 name: "ctx2".to_string(),
1185 cores: None,
1186 });
1187 context_map.assign_stream("StreamA".to_string(), "ctx1".to_string());
1188 context_map.assign_stream("StreamB".to_string(), "ctx2".to_string());
1189
1190 let filtered = filter_program_for_context(&program, "ctx1", &context_map);
1191
1192 let stream_count = filtered
1193 .statements
1194 .iter()
1195 .filter(|s| matches!(s.node, Stmt::StreamDecl { .. }))
1196 .count();
1197 assert_eq!(stream_count, 1, "ctx1 should have exactly 1 stream");
1198
1199 let has_stream_a = filtered
1200 .statements
1201 .iter()
1202 .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamA"));
1203 assert!(has_stream_a, "ctx1 should contain StreamA");
1204
1205 let has_stream_b = filtered
1206 .statements
1207 .iter()
1208 .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamB"));
1209 assert!(!has_stream_b, "ctx1 should NOT contain StreamB");
1210
1211 let context_decl_count = filtered
1212 .statements
1213 .iter()
1214 .filter(|s| matches!(s.node, Stmt::ContextDecl { .. }))
1215 .count();
1216 assert_eq!(
1217 context_decl_count, 2,
1218 "All ContextDecls should be preserved"
1219 );
1220 }
1221
1222 #[test]
1223 fn test_ingress_routing_includes_derived_types() {
1224 use varpulis_core::span::Spanned;
1225
1226 let program = Program {
1227 statements: vec![
1228 Spanned {
1229 node: Stmt::ContextDecl {
1230 name: "ingest".to_string(),
1231 cores: None,
1232 },
1233 span: varpulis_core::span::Span::dummy(),
1234 },
1235 Spanned {
1236 node: Stmt::ContextDecl {
1237 name: "analytics".to_string(),
1238 cores: None,
1239 },
1240 span: varpulis_core::span::Span::dummy(),
1241 },
1242 Spanned {
1243 node: Stmt::StreamDecl {
1244 name: "RawData".to_string(),
1245 type_annotation: None,
1246 source: StreamSource::Ident("SensorReading".to_string()),
1247 ops: vec![],
1248 op_spans: vec![],
1249 },
1250 span: varpulis_core::span::Span::dummy(),
1251 },
1252 Spanned {
1253 node: Stmt::StreamDecl {
1254 name: "Analysis".to_string(),
1255 type_annotation: None,
1256 source: StreamSource::Ident("RawData".to_string()),
1257 ops: vec![],
1258 op_spans: vec![],
1259 },
1260 span: varpulis_core::span::Span::dummy(),
1261 },
1262 ],
1263 };
1264
1265 let mut context_map = ContextMap::new();
1266 context_map.register_context(ContextConfig {
1267 name: "ingest".to_string(),
1268 cores: None,
1269 });
1270 context_map.register_context(ContextConfig {
1271 name: "analytics".to_string(),
1272 cores: None,
1273 });
1274 context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1275 context_map.assign_stream("Analysis".to_string(), "analytics".to_string());
1276
1277 let (output_tx, _output_rx) = mpsc::channel(10);
1278 let orchestrator =
1279 ContextOrchestrator::build(&context_map, &program, output_tx, 100).unwrap();
1280
1281 let routing = orchestrator.ingress_routing();
1282
1283 assert_eq!(routing.get("SensorReading"), Some(&"ingest".to_string()));
1284 assert_eq!(routing.get("RawData"), Some(&"analytics".to_string()));
1285
1286 orchestrator.shutdown();
1287 }
1288
1289 #[test]
1290 fn test_ingress_routing_cross_context_emits() {
1291 let mut context_map = ContextMap::new();
1292 context_map.register_context(ContextConfig {
1293 name: "ingest".to_string(),
1294 cores: None,
1295 });
1296 context_map.register_context(ContextConfig {
1297 name: "analytics".to_string(),
1298 cores: None,
1299 });
1300 context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1301 context_map.add_cross_context_emit("RawData".to_string(), 0, "analytics".to_string());
1302
1303 let emits = context_map.cross_context_emits();
1304 assert_eq!(
1305 emits.get(&("RawData".to_string(), 0)),
1306 Some(&"analytics".to_string())
1307 );
1308 }
1309
1310 #[test]
1311 #[cfg(target_os = "linux")]
1312 fn test_cpu_affinity_verification() {
1313 let cores = verify_cpu_affinity();
1314 assert!(cores.is_some(), "Should be able to read CPU affinity");
1315 let cores = cores.unwrap();
1316 assert!(!cores.is_empty(), "Should have at least one allowed core");
1317 }
1318}