Skip to main content

varpulis_runtime/
context.rs

1//! Context-based multi-threaded execution architecture.
2//!
3//! Named contexts provide isolated execution domains. Each context runs on its own
4//! OS thread with a single-threaded Tokio runtime, enabling true parallelism without
5//! locks within a context. Cross-context communication uses bounded `mpsc` channels.
6//!
7//! When no contexts are declared, the engine runs in single-threaded mode with zero
8//! overhead (backward compatible).
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Instant;
13
14use rustc_hash::FxHashMap;
15use tokio::sync::{mpsc, watch};
16use tracing::{error, info, warn};
17use varpulis_core::ast::{Program, Stmt, StreamSource};
18
19use crate::engine::Engine;
20use crate::event::{Event, SharedEvent};
21use crate::persistence::{CheckpointConfig, CheckpointManager, EngineCheckpoint, StoreError};
22
23/// Messages sent through context channels.
24///
25/// Wraps either a regular event or a checkpoint barrier for exactly-once semantics.
26#[derive(Debug, Clone)]
27pub enum ContextMessage {
28    /// A regular event to process
29    Event(SharedEvent),
30    /// A checkpoint barrier — triggers state snapshot
31    CheckpointBarrier(CheckpointBarrier),
32    /// Watermark update from an upstream context
33    WatermarkUpdate {
34        source_context: String,
35        watermark_ms: i64,
36    },
37}
38
39/// A checkpoint barrier flowing through the context DAG.
40#[derive(Debug, Clone)]
41pub struct CheckpointBarrier {
42    pub checkpoint_id: u64,
43    pub timestamp_ms: i64,
44}
45
46/// Acknowledgment from a context after completing a checkpoint.
47#[derive(Debug)]
48pub struct CheckpointAck {
49    pub context_name: String,
50    pub checkpoint_id: u64,
51    pub engine_checkpoint: EngineCheckpoint,
52}
53
54/// Tracks a pending coordinated checkpoint across all contexts.
55struct PendingCheckpoint {
56    checkpoint_id: u64,
57    timestamp_ms: i64,
58    acks: HashMap<String, EngineCheckpoint>,
59    started_at: Instant,
60}
61
62/// Coordinates checkpoints across multiple contexts.
63///
64/// Sends `CheckpointBarrier` to all contexts, collects `CheckpointAck` responses,
65/// and persists the assembled `Checkpoint` once all contexts have acknowledged.
66pub struct CheckpointCoordinator {
67    manager: CheckpointManager,
68    ack_tx: mpsc::Sender<CheckpointAck>,
69    ack_rx: mpsc::Receiver<CheckpointAck>,
70    context_names: Vec<String>,
71    pending: Option<PendingCheckpoint>,
72    next_checkpoint_id: u64,
73}
74
75impl std::fmt::Debug for CheckpointCoordinator {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("CheckpointCoordinator")
78            .field("context_names", &self.context_names)
79            .field("next_checkpoint_id", &self.next_checkpoint_id)
80            .field("has_pending", &self.pending.is_some())
81            .finish_non_exhaustive()
82    }
83}
84
85impl CheckpointCoordinator {
86    /// Create a new coordinator for the given contexts.
87    pub fn new(manager: CheckpointManager, context_names: Vec<String>) -> Self {
88        let (ack_tx, ack_rx) = mpsc::channel(context_names.len() * 2);
89        Self {
90            manager,
91            ack_tx,
92            ack_rx,
93            context_names,
94            pending: None,
95            next_checkpoint_id: 1,
96        }
97    }
98
99    /// Get a sender for checkpoint acknowledgments (cloned into each context).
100    pub fn ack_sender(&self) -> mpsc::Sender<CheckpointAck> {
101        self.ack_tx.clone()
102    }
103
104    /// Initiate a new checkpoint by sending barriers to all contexts.
105    pub fn initiate(&mut self, context_txs: &FxHashMap<String, mpsc::Sender<ContextMessage>>) {
106        if self.pending.is_some() {
107            warn!("Checkpoint already in progress, skipping initiation");
108            return;
109        }
110
111        let checkpoint_id = self.next_checkpoint_id;
112        self.next_checkpoint_id += 1;
113        let timestamp_ms = chrono::Utc::now().timestamp_millis();
114
115        let barrier = CheckpointBarrier {
116            checkpoint_id,
117            timestamp_ms,
118        };
119
120        for (ctx_name, tx) in context_txs {
121            if let Err(e) = tx.try_send(ContextMessage::CheckpointBarrier(barrier.clone())) {
122                error!(
123                    "Failed to send checkpoint barrier to context '{}': {}",
124                    ctx_name, e
125                );
126            }
127        }
128
129        self.pending = Some(PendingCheckpoint {
130            checkpoint_id,
131            timestamp_ms,
132            acks: HashMap::new(),
133            started_at: Instant::now(),
134        });
135
136        info!("Initiated checkpoint {}", checkpoint_id);
137    }
138
139    /// Receive an acknowledgment. Returns a complete `Checkpoint` when all contexts have acked.
140    pub fn receive_ack(&mut self, ack: CheckpointAck) -> Option<crate::persistence::Checkpoint> {
141        let pending = self.pending.as_mut()?;
142
143        if ack.checkpoint_id != pending.checkpoint_id {
144            warn!(
145                "Received ack for checkpoint {} but expecting {}",
146                ack.checkpoint_id, pending.checkpoint_id
147            );
148            return None;
149        }
150
151        pending.acks.insert(ack.context_name, ack.engine_checkpoint);
152
153        if pending.acks.len() == self.context_names.len() {
154            let pending = self.pending.take().unwrap();
155            let mut context_states = HashMap::new();
156            for (name, cp) in pending.acks {
157                context_states.insert(name, cp);
158            }
159
160            Some(crate::persistence::Checkpoint {
161                id: pending.checkpoint_id,
162                timestamp_ms: pending.timestamp_ms,
163                events_processed: 0, // Filled from context states
164                window_states: HashMap::new(),
165                pattern_states: HashMap::new(),
166                metadata: HashMap::new(),
167                context_states,
168            })
169        } else {
170            None
171        }
172    }
173
174    /// Try to drain pending acks and complete the checkpoint.
175    pub fn try_complete(&mut self) -> Result<(), StoreError> {
176        while let Ok(ack) = self.ack_rx.try_recv() {
177            if let Some(checkpoint) = self.receive_ack(ack) {
178                self.manager.checkpoint(checkpoint)?;
179                return Ok(());
180            }
181        }
182
183        // Warn if a checkpoint has been pending for too long (> 30s)
184        if let Some(ref pending) = self.pending {
185            if pending.started_at.elapsed() > std::time::Duration::from_secs(30) {
186                warn!(
187                    "Checkpoint {} has been pending for {:.1}s — contexts may be blocked",
188                    pending.checkpoint_id,
189                    pending.started_at.elapsed().as_secs_f64()
190                );
191            }
192        }
193
194        Ok(())
195    }
196
197    /// Check if a checkpoint should be initiated based on interval.
198    pub fn should_checkpoint(&self) -> bool {
199        self.pending.is_none() && self.manager.should_checkpoint()
200    }
201
202    /// Whether a checkpoint is currently in progress (waiting for acks).
203    pub const fn has_pending(&self) -> bool {
204        self.pending.is_some()
205    }
206}
207
208/// Configuration for a named context
209#[derive(Debug, Clone)]
210pub struct ContextConfig {
211    pub name: String,
212    pub cores: Option<Vec<usize>>,
213}
214
215/// Maps streams/connectors to their assigned context.
216///
217/// Built during `Engine::load()` by processing `ContextDecl` statements
218/// and `StreamOp::Context` / `Emit { target_context }` operations.
219#[derive(Debug, Clone, Default)]
220pub struct ContextMap {
221    /// context_name -> config
222    contexts: HashMap<String, ContextConfig>,
223    /// stream_name -> context_name
224    stream_assignments: HashMap<String, String>,
225    /// (stream_name, emit_index) -> target_context for cross-context emits
226    cross_context_emits: HashMap<(String, usize), String>,
227}
228
229impl ContextMap {
230    pub fn new() -> Self {
231        Self::default()
232    }
233
234    /// Register a context declaration
235    pub fn register_context(&mut self, config: ContextConfig) {
236        self.contexts.insert(config.name.clone(), config);
237    }
238
239    /// Assign a stream to a context
240    pub fn assign_stream(&mut self, stream_name: String, context_name: String) {
241        self.stream_assignments.insert(stream_name, context_name);
242    }
243
244    /// Record a cross-context emit
245    pub fn add_cross_context_emit(
246        &mut self,
247        stream_name: String,
248        emit_index: usize,
249        target_context: String,
250    ) {
251        self.cross_context_emits
252            .insert((stream_name, emit_index), target_context);
253    }
254
255    /// Check if any contexts have been declared
256    pub fn has_contexts(&self) -> bool {
257        !self.contexts.is_empty()
258    }
259
260    /// Get all declared contexts
261    pub const fn contexts(&self) -> &HashMap<String, ContextConfig> {
262        &self.contexts
263    }
264
265    /// Get the context assignment for a stream
266    pub fn stream_context(&self, stream_name: &str) -> Option<&str> {
267        self.stream_assignments.get(stream_name).map(|s| s.as_str())
268    }
269
270    /// Get all stream assignments
271    pub const fn stream_assignments(&self) -> &HashMap<String, String> {
272        &self.stream_assignments
273    }
274
275    /// Get all cross-context emits
276    pub const fn cross_context_emits(&self) -> &HashMap<(String, usize), String> {
277        &self.cross_context_emits
278    }
279}
280
281/// Filter a program to keep only the streams assigned to a specific context.
282///
283/// Retains all non-stream statements (ContextDecl, ConnectorDecl, VarDecl,
284/// Assignment, FnDecl, EventDecl, PatternDecl, Config) since they may be
285/// needed by any context. Only `StreamDecl` statements are filtered based
286/// on context assignment.
287pub fn filter_program_for_context(
288    program: &Program,
289    context_name: &str,
290    context_map: &ContextMap,
291) -> Program {
292    let filtered_statements = program
293        .statements
294        .iter()
295        .filter(|stmt| {
296            if let Stmt::StreamDecl { name, .. } = &stmt.node {
297                // Keep the stream only if it's assigned to this context
298                match context_map.stream_context(name) {
299                    Some(ctx) => ctx == context_name,
300                    // Unassigned streams are kept in all contexts for backward compat
301                    None => true,
302                }
303            } else {
304                // Keep all non-stream statements
305                true
306            }
307        })
308        .cloned()
309        .collect();
310
311    Program {
312        statements: filtered_statements,
313    }
314}
315
316/// Verify the CPU affinity of the current thread by reading /proc/self/status.
317///
318/// Returns the list of CPU cores the current thread is allowed to run on,
319/// or `None` if the information cannot be read.
320#[cfg(target_os = "linux")]
321pub fn verify_cpu_affinity() -> Option<Vec<usize>> {
322    use std::fs;
323
324    let status = fs::read_to_string("/proc/self/status").ok()?;
325    for line in status.lines() {
326        if line.starts_with("Cpus_allowed_list:") {
327            let list_str = line.split(':').nth(1)?.trim();
328            let mut cores = Vec::new();
329            for part in list_str.split(',') {
330                let part = part.trim();
331                if let Some((start, end)) = part.split_once('-') {
332                    if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
333                        cores.extend(s..=e);
334                    }
335                } else if let Ok(core) = part.parse::<usize>() {
336                    cores.push(core);
337                }
338            }
339            return Some(cores);
340        }
341    }
342    None
343}
344
345/// A self-contained single-threaded runtime for one context.
346///
347/// Owns its streams, processes events without locks. Receives events from
348/// its inbound channel and forwards cross-context events via outbound channels.
349pub struct ContextRuntime {
350    name: String,
351    engine: Engine,
352    /// Main output channel (tenant/CLI)
353    output_tx: mpsc::Sender<Event>,
354    /// Inbound messages from orchestrator (events + barriers)
355    event_rx: mpsc::Receiver<ContextMessage>,
356    /// Engine's emitted events receiver
357    engine_output_rx: mpsc::Receiver<Event>,
358    /// Senders to all contexts (including self, for intra-context derived streams)
359    all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
360    /// event_type → context_name routing table
361    ingress_routing: FxHashMap<String, String>,
362    /// Shutdown signal receiver
363    shutdown_rx: watch::Receiver<bool>,
364    /// Checkpoint ack sender (if coordinated checkpointing is enabled)
365    ack_tx: Option<mpsc::Sender<CheckpointAck>>,
366    events_processed: u64,
367    output_events_emitted: u64,
368}
369
370impl std::fmt::Debug for ContextRuntime {
371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372        f.debug_struct("ContextRuntime")
373            .field("name", &self.name)
374            .field("ingress_routing", &self.ingress_routing)
375            .field("events_processed", &self.events_processed)
376            .field("output_events_emitted", &self.output_events_emitted)
377            .finish_non_exhaustive()
378    }
379}
380
381impl ContextRuntime {
382    /// Create a new context runtime
383    #[allow(clippy::too_many_arguments)]
384    pub const fn new(
385        name: String,
386        engine: Engine,
387        output_tx: mpsc::Sender<Event>,
388        event_rx: mpsc::Receiver<ContextMessage>,
389        engine_output_rx: mpsc::Receiver<Event>,
390        all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
391        ingress_routing: FxHashMap<String, String>,
392        shutdown_rx: watch::Receiver<bool>,
393    ) -> Self {
394        Self {
395            name,
396            engine,
397            output_tx,
398            event_rx,
399            engine_output_rx,
400            all_context_txs,
401            ingress_routing,
402            shutdown_rx,
403            ack_tx: None,
404            events_processed: 0,
405            output_events_emitted: 0,
406        }
407    }
408
409    /// Set the checkpoint acknowledgment sender for coordinated checkpointing.
410    pub fn with_ack_sender(mut self, ack_tx: mpsc::Sender<CheckpointAck>) -> Self {
411        self.ack_tx = Some(ack_tx);
412        self
413    }
414
415    /// Drain engine output events and route them to consuming contexts
416    /// and the main output channel.
417    fn drain_and_route_output(&mut self) {
418        while let Ok(output_event) = self.engine_output_rx.try_recv() {
419            self.output_events_emitted += 1;
420
421            // Route to consuming context if any
422            if let Some(target_ctx) = self.ingress_routing.get(&*output_event.event_type) {
423                if let Some(tx) = self.all_context_txs.get(target_ctx) {
424                    let shared = Arc::new(output_event);
425                    let _ = tx.try_send(ContextMessage::Event(Arc::clone(&shared)));
426                    // Unwrap the Arc for the output channel (we hold the only other ref)
427                    let owned = Arc::try_unwrap(shared).unwrap_or_else(|arc| (*arc).clone());
428                    let _ = self.output_tx.try_send(owned);
429                    continue;
430                }
431            }
432
433            // Always forward to main output channel
434            let _ = self.output_tx.try_send(output_event);
435        }
436    }
437
438    /// Handle a checkpoint barrier by snapshotting engine state and sending ack.
439    async fn handle_checkpoint_barrier(&self, barrier: CheckpointBarrier) {
440        if let Some(ref ack_tx) = self.ack_tx {
441            let checkpoint = self.engine.create_checkpoint();
442            let _ = ack_tx
443                .send(CheckpointAck {
444                    context_name: self.name.clone(),
445                    checkpoint_id: barrier.checkpoint_id,
446                    engine_checkpoint: checkpoint,
447                })
448                .await;
449        }
450    }
451
452    /// Run the event loop. Blocks the current thread.
453    ///
454    /// Receives events from the inbound channel, processes them through
455    /// the engine, and forwards cross-context events as needed.
456    ///
457    /// If the engine has session windows, a periodic sweep timer runs
458    /// every `gap` duration to close stale sessions. This ensures sessions
459    /// are emitted even when no new events arrive.
460    pub async fn run(&mut self) {
461        info!("Context '{}' runtime started", self.name);
462
463        #[cfg(target_os = "linux")]
464        if let Some(cores) = verify_cpu_affinity() {
465            info!("Context '{}' running on cores {:?}", self.name, cores);
466        }
467
468        // Compute sweep interval from engine's session window gaps
469        let has_sessions = self.engine.has_session_windows();
470        let sweep_interval = self
471            .engine
472            .min_session_gap()
473            .and_then(|d| d.to_std().ok())
474            .unwrap_or(std::time::Duration::from_secs(60));
475
476        let mut sweep_timer = tokio::time::interval(sweep_interval);
477        sweep_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
478        // Skip the first immediate tick
479        sweep_timer.tick().await;
480
481        loop {
482            tokio::select! {
483                biased;
484
485                _ = self.shutdown_rx.changed() => {
486                    if *self.shutdown_rx.borrow() {
487                        info!("Context '{}' received shutdown signal", self.name);
488                        // On shutdown: flush all remaining sessions
489                        if has_sessions {
490                            if let Err(e) = self.engine.flush_expired_sessions().await {
491                                error!("Context '{}' shutdown session flush error: {}", self.name, e);
492                            }
493                            self.drain_and_route_output();
494                        }
495                        break;
496                    }
497                }
498
499                _ = sweep_timer.tick(), if has_sessions => {
500                    match self.engine.flush_expired_sessions().await {
501                        Ok(()) => {}
502                        Err(e) => {
503                            error!("Context '{}' session sweep error: {}", self.name, e);
504                        }
505                    }
506                    self.drain_and_route_output();
507                }
508
509                msg = self.event_rx.recv() => {
510                    match msg {
511                        Some(ContextMessage::Event(event)) => {
512                            self.events_processed += 1;
513
514                            // Process the event through the engine (zero-copy via SharedEvent)
515                            match self.engine.process_shared(Arc::clone(&event)).await {
516                                Ok(()) => {}
517                                Err(e) => {
518                                    error!("Context '{}' processing error: {}", self.name, e);
519                                }
520                            }
521
522                            self.drain_and_route_output();
523                        }
524                        Some(ContextMessage::CheckpointBarrier(barrier)) => {
525                            self.handle_checkpoint_barrier(barrier).await;
526                        }
527                        Some(ContextMessage::WatermarkUpdate { source_context, watermark_ms }) => {
528                            // Feed watermark into engine's tracker (Phase 2E)
529                            let _ = self.engine.advance_external_watermark(&source_context, watermark_ms).await;
530                        }
531                        None => {
532                            // Channel closed
533                            break;
534                        }
535                    }
536                }
537            }
538        }
539
540        // Drop cross-context senders so other contexts can shut down too
541        self.all_context_txs.clear();
542
543        info!(
544            "Context '{}' runtime stopped (processed {} events, emitted {} output events)",
545            self.name, self.events_processed, self.output_events_emitted
546        );
547    }
548}
549
550/// Direct event-type-to-channel router for non-blocking dispatch.
551///
552/// Maps `event_type → Sender<ContextMessage>` directly (single HashMap lookup),
553/// uses `try_send()` for non-blocking dispatch, and is cheaply cloneable
554/// via `Arc<HashMap>` for multi-producer scenarios.
555#[derive(Clone)]
556pub struct EventTypeRouter {
557    routes: Arc<FxHashMap<String, mpsc::Sender<ContextMessage>>>,
558    default_tx: mpsc::Sender<ContextMessage>,
559}
560
561impl std::fmt::Debug for EventTypeRouter {
562    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563        f.debug_struct("EventTypeRouter")
564            .field("route_count", &self.routes.len())
565            .field("route_keys", &self.routes.keys().collect::<Vec<_>>())
566            .finish_non_exhaustive()
567    }
568}
569
570/// Errors returned by non-blocking dispatch methods.
571#[derive(Debug)]
572pub enum DispatchError {
573    /// Channel is full — caller should retry or use async dispatch
574    ChannelFull(ContextMessage),
575    /// Channel is closed — context has shut down
576    ChannelClosed(ContextMessage),
577}
578
579impl EventTypeRouter {
580    /// Non-blocking dispatch via `try_send()`.
581    ///
582    /// Routes the event to the correct context channel based on event type.
583    /// Returns immediately without waiting for channel capacity.
584    pub fn dispatch(&self, event: SharedEvent) -> Result<(), DispatchError> {
585        let tx = self
586            .routes
587            .get(&*event.event_type)
588            .unwrap_or(&self.default_tx);
589        let msg = ContextMessage::Event(event);
590        match tx.try_send(msg) {
591            Ok(()) => Ok(()),
592            Err(mpsc::error::TrySendError::Full(msg)) => Err(DispatchError::ChannelFull(msg)),
593            Err(mpsc::error::TrySendError::Closed(msg)) => Err(DispatchError::ChannelClosed(msg)),
594        }
595    }
596
597    /// Blocking dispatch via `send().await`.
598    ///
599    /// Waits for channel capacity if the channel is full.
600    pub async fn dispatch_await(&self, event: SharedEvent) -> Result<(), String> {
601        let event_type = event.event_type.clone();
602        let tx = self.routes.get(&*event_type).unwrap_or(&self.default_tx);
603        tx.send(ContextMessage::Event(event))
604            .await
605            .map_err(|e| format!("Failed to send event type '{event_type}': {e}"))
606    }
607
608    /// Batch dispatch — non-blocking, returns errors for any events that could not be sent.
609    pub fn dispatch_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
610        let mut errors = Vec::new();
611        for event in events {
612            if let Err(e) = self.dispatch(event) {
613                errors.push(e);
614            }
615        }
616        errors
617    }
618}
619
620/// Orchestrates multiple ContextRuntimes across OS threads.
621///
622/// Routes incoming events to the correct context based on event type
623/// and stream assignments.
624pub struct ContextOrchestrator {
625    /// Senders to each context's event channel
626    context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
627    /// Thread handles for each context
628    handles: Vec<std::thread::JoinHandle<()>>,
629    /// event_type -> context_name routing table
630    ingress_routing: FxHashMap<String, String>,
631    /// Shutdown signal sender
632    shutdown_tx: watch::Sender<bool>,
633    /// Direct event-type-to-channel router
634    router: EventTypeRouter,
635    /// Optional checkpoint coordinator for exactly-once semantics
636    checkpoint_coordinator: Option<CheckpointCoordinator>,
637}
638
639impl std::fmt::Debug for ContextOrchestrator {
640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641        f.debug_struct("ContextOrchestrator")
642            .field("context_count", &self.context_txs.len())
643            .field("ingress_routing", &self.ingress_routing)
644            .field("handle_count", &self.handles.len())
645            .finish_non_exhaustive()
646    }
647}
648
649impl ContextOrchestrator {
650    /// Build the orchestrator from engine state.
651    ///
652    /// For each declared context:
653    /// 1. Creates a bounded mpsc channel
654    /// 2. Creates an Engine with only the streams assigned to that context
655    /// 3. Spawns an OS thread with optional CPU affinity
656    /// 4. Inside the thread: creates a single-threaded Tokio runtime
657    ///    and runs the ContextRuntime event loop
658    pub fn build(
659        context_map: &ContextMap,
660        program: &Program,
661        output_tx: mpsc::Sender<Event>,
662        channel_capacity: usize,
663    ) -> Result<Self, String> {
664        Self::build_with_checkpoint(
665            context_map,
666            program,
667            output_tx,
668            channel_capacity,
669            None,
670            None,
671        )
672    }
673
674    /// Build the orchestrator with optional checkpoint configuration and recovery state.
675    pub fn build_with_checkpoint(
676        context_map: &ContextMap,
677        program: &Program,
678        output_tx: mpsc::Sender<Event>,
679        channel_capacity: usize,
680        checkpoint_config: Option<(CheckpointConfig, Arc<dyn crate::persistence::StateStore>)>,
681        recovery_checkpoint: Option<&crate::persistence::Checkpoint>,
682    ) -> Result<Self, String> {
683        let mut context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = FxHashMap::default();
684        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
685
686        // Create shutdown signal
687        let (shutdown_tx, _shutdown_rx) = watch::channel(false);
688
689        // Determine default context
690        let default_context = context_map
691            .contexts()
692            .keys()
693            .next()
694            .cloned()
695            .unwrap_or_else(|| "default".to_string());
696
697        // Create cross-context senders: first pass to create all channels
698        let mut context_rxs: FxHashMap<String, mpsc::Receiver<ContextMessage>> =
699            FxHashMap::default();
700        for ctx_name in context_map.contexts().keys() {
701            let (tx, rx) = mpsc::channel(channel_capacity);
702            context_txs.insert(ctx_name.clone(), tx);
703            context_rxs.insert(ctx_name.clone(), rx);
704        }
705
706        // Set up checkpoint coordinator if configured
707        let context_names: Vec<String> = context_map.contexts().keys().cloned().collect();
708        let checkpoint_coordinator = checkpoint_config.map(|(config, store)| {
709            let manager = CheckpointManager::new(store, config)
710                .map_err(|e| format!("Failed to create checkpoint manager: {e}"))
711                .unwrap();
712            CheckpointCoordinator::new(manager, context_names.clone())
713        });
714        let ack_tx = checkpoint_coordinator.as_ref().map(|c| c.ack_sender());
715
716        // Build ingress routing: event_type -> context_name
717        let mut ingress_routing: FxHashMap<String, String> = FxHashMap::default();
718
719        // First pass: route raw event types from stream sources to contexts
720        for stmt in &program.statements {
721            if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
722                if let Some(ctx_name) = context_map.stream_context(name) {
723                    let event_types = Self::event_types_from_source(source);
724                    for et in event_types {
725                        ingress_routing.insert(et, ctx_name.to_string());
726                    }
727                }
728            }
729        }
730
731        // Second pass: route derived stream output types to consuming contexts
732        for stmt in &program.statements {
733            if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
734                if let Some(ctx_name) = context_map.stream_context(name) {
735                    if let StreamSource::Ident(source_stream) = source {
736                        if context_map.stream_context(source_stream).is_some() {
737                            ingress_routing.insert(source_stream.clone(), ctx_name.to_string());
738                        }
739                    }
740                }
741            }
742        }
743
744        // Third pass: validate cross-context emit targets
745        for ((_stream_name, _emit_idx), target_ctx) in context_map.cross_context_emits() {
746            if !context_txs.contains_key(target_ctx) {
747                warn!(
748                    "Cross-context emit targets unknown context '{}'",
749                    target_ctx
750                );
751            }
752        }
753
754        // Build EventTypeRouter: event_type → Sender directly (single lookup)
755        let mut event_type_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> =
756            FxHashMap::default();
757        for (event_type, ctx_name) in &ingress_routing {
758            if let Some(tx) = context_txs.get(ctx_name) {
759                event_type_txs.insert(event_type.clone(), tx.clone());
760            }
761        }
762        let default_tx = context_txs
763            .get(&default_context)
764            .cloned()
765            .ok_or_else(|| format!("No channel for default context '{default_context}'"))?;
766        let router = EventTypeRouter {
767            routes: Arc::new(event_type_txs),
768            default_tx,
769        };
770
771        // Clone context_map for use inside thread spawning
772        let context_map_clone = context_map.clone();
773
774        // Clone recovery state per context
775        let recovery_states: HashMap<String, EngineCheckpoint> = recovery_checkpoint
776            .map(|cp| cp.context_states.clone())
777            .unwrap_or_default();
778
779        // Spawn a thread for each context
780        for (ctx_name, config) in context_map.contexts() {
781            let rx = context_rxs
782                .remove(ctx_name)
783                .ok_or_else(|| format!("No receiver for context {ctx_name}"))?;
784
785            let ctx_output_tx = output_tx.clone();
786            let ctx_name_clone = ctx_name.clone();
787            let cores = config.cores.clone();
788
789            // Clone all context senders for cross-context forwarding
790            let all_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = context_txs
791                .iter()
792                .map(|(k, v)| (k.clone(), v.clone()))
793                .collect();
794
795            // Filter the program to only include this context's streams
796            let filtered_program =
797                filter_program_for_context(program, ctx_name, &context_map_clone);
798            let ingress_routing_clone = ingress_routing.clone();
799            let shutdown_rx = shutdown_tx.subscribe();
800            let ctx_ack_tx = ack_tx.clone();
801            let ctx_recovery = recovery_states.get(ctx_name).cloned();
802
803            let handle = std::thread::Builder::new()
804                .name(format!("varpulis-ctx-{ctx_name}"))
805                .spawn(move || {
806                    // Set CPU affinity if specified
807                    if let Some(ref core_ids) = cores {
808                        Self::set_cpu_affinity(&ctx_name_clone, core_ids);
809                    }
810
811                    // Create a single-threaded Tokio runtime for this context
812                    let rt = tokio::runtime::Builder::new_current_thread()
813                        .enable_all()
814                        .build()
815                        .expect("Failed to create Tokio runtime for context");
816
817                    rt.block_on(async move {
818                        // Create engine for this context with filtered program
819                        let (engine_output_tx, engine_output_rx) = mpsc::channel(1000);
820                        let mut engine = Engine::new(engine_output_tx);
821                        engine.set_context_name(&ctx_name_clone);
822                        if let Err(e) = engine.load(&filtered_program) {
823                            error!(
824                                "Failed to load program for context '{}': {}",
825                                ctx_name_clone, e
826                            );
827                            return;
828                        }
829
830                        // Connect sinks (MQTT, Kafka, etc.) after load
831                        if let Err(e) = engine.connect_sinks().await {
832                            error!(
833                                "Failed to connect sinks for context '{}': {}",
834                                ctx_name_clone, e
835                            );
836                            return;
837                        }
838
839                        // Restore from checkpoint if available
840                        if let Some(cp) = ctx_recovery {
841                            if let Err(e) = engine.restore_checkpoint(&cp) {
842                                tracing::error!(
843                                    "Context {} failed to restore checkpoint: {}",
844                                    ctx_name_clone,
845                                    e
846                                );
847                                return;
848                            }
849                        }
850
851                        let mut ctx_runtime = ContextRuntime::new(
852                            ctx_name_clone,
853                            engine,
854                            ctx_output_tx,
855                            rx,
856                            engine_output_rx,
857                            all_txs,
858                            ingress_routing_clone,
859                            shutdown_rx,
860                        );
861
862                        if let Some(ack_tx) = ctx_ack_tx {
863                            ctx_runtime = ctx_runtime.with_ack_sender(ack_tx);
864                        }
865
866                        ctx_runtime.run().await;
867                    });
868                })
869                .map_err(|e| format!("Failed to spawn context thread: {e}"))?;
870
871            handles.push(handle);
872        }
873
874        Ok(Self {
875            context_txs,
876            handles,
877            ingress_routing,
878            shutdown_tx,
879            router,
880            checkpoint_coordinator,
881        })
882    }
883
884    /// Route an incoming event to the correct context (async, waits on backpressure).
885    pub async fn process(&self, event: SharedEvent) -> Result<(), String> {
886        self.router.dispatch_await(event).await
887    }
888
889    /// Non-blocking dispatch — returns `ChannelFull` instead of waiting.
890    pub fn try_process(&self, event: SharedEvent) -> Result<(), DispatchError> {
891        self.router.dispatch(event)
892    }
893
894    /// Batch dispatch — non-blocking, returns errors for events that could not be sent.
895    pub fn process_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
896        self.router.dispatch_batch(events)
897    }
898
899    /// Get a cloneable router handle for direct multi-producer dispatch.
900    pub fn router(&self) -> EventTypeRouter {
901        self.router.clone()
902    }
903
904    /// Shut down all context threads.
905    ///
906    /// Sends shutdown signal, drops senders, and waits for threads to finish.
907    pub fn shutdown(self) {
908        // Signal all contexts to shut down
909        let _ = self.shutdown_tx.send(true);
910
911        // Drop all senders to unblock any recv() calls
912        drop(self.context_txs);
913
914        // Wait for all threads to finish
915        for handle in self.handles {
916            if let Err(e) = handle.join() {
917                error!("Context thread panicked: {:?}", e);
918            }
919        }
920
921        info!("All context runtimes shut down");
922    }
923
924    /// Trigger a checkpoint across all contexts.
925    ///
926    /// Sends a `CheckpointBarrier` to every context. Each context will snapshot
927    /// its engine state and send a `CheckpointAck` back. Call `try_complete_checkpoint()`
928    /// afterwards to drain acks and persist.
929    pub fn trigger_checkpoint(&mut self) {
930        if let Some(ref mut coordinator) = self.checkpoint_coordinator {
931            coordinator.initiate(&self.context_txs);
932        }
933    }
934
935    /// Try to complete a pending checkpoint by draining acknowledgments.
936    ///
937    /// Returns `true` if a checkpoint was fully completed and persisted.
938    pub fn try_complete_checkpoint(&mut self) -> Result<bool, StoreError> {
939        if let Some(ref mut coordinator) = self.checkpoint_coordinator {
940            let had_pending = coordinator.has_pending();
941            coordinator.try_complete()?;
942            // If we had a pending checkpoint and now it's gone, we completed it
943            Ok(had_pending && !coordinator.has_pending())
944        } else {
945            Ok(false)
946        }
947    }
948
949    /// Check if a periodic checkpoint should be triggered (based on configured interval).
950    pub fn should_checkpoint(&self) -> bool {
951        self.checkpoint_coordinator
952            .as_ref()
953            .is_some_and(|c| c.should_checkpoint())
954    }
955
956    /// Run one checkpoint cycle: trigger if due, then try to complete.
957    ///
958    /// Call this periodically from the main event loop (e.g., every second or on a timer).
959    pub fn checkpoint_tick(&mut self) -> Result<bool, StoreError> {
960        if self.should_checkpoint() {
961            self.trigger_checkpoint();
962        }
963        self.try_complete_checkpoint()
964    }
965
966    /// Get the names of all running contexts
967    pub fn context_names(&self) -> Vec<&str> {
968        self.context_txs.keys().map(|s| s.as_str()).collect()
969    }
970
971    /// Get the ingress routing table (for testing/debugging)
972    pub const fn ingress_routing(&self) -> &FxHashMap<String, String> {
973        &self.ingress_routing
974    }
975
976    /// Extract event types consumed by a stream source
977    fn event_types_from_source(source: &StreamSource) -> Vec<String> {
978        match source {
979            StreamSource::Ident(name) => vec![name.clone()],
980            StreamSource::IdentWithAlias { name, .. } => vec![name.clone()],
981            StreamSource::AllWithAlias { name, .. } => vec![name.clone()],
982            StreamSource::FromConnector { event_type, .. } => vec![event_type.clone()],
983            StreamSource::Merge(decls) => decls.iter().map(|d| d.source.clone()).collect(),
984            StreamSource::Join(clauses) => clauses.iter().map(|c| c.source.clone()).collect(),
985            StreamSource::Sequence(decl) => {
986                decl.steps.iter().map(|s| s.event_type.clone()).collect()
987            }
988            StreamSource::Timer(_) => vec![],
989        }
990    }
991
992    /// Set CPU affinity for the current thread
993    fn set_cpu_affinity(ctx_name: &str, core_ids: &[usize]) {
994        #[cfg(target_os = "linux")]
995        {
996            use core_affinity::CoreId;
997            if let Some(&first_core) = core_ids.first() {
998                let core_id = CoreId { id: first_core };
999                if core_affinity::set_for_current(core_id) {
1000                    info!("Context '{}' pinned to core {}", ctx_name, first_core);
1001                } else {
1002                    warn!(
1003                        "Failed to pin context '{}' to core {}",
1004                        ctx_name, first_core
1005                    );
1006                }
1007            }
1008        }
1009
1010        #[cfg(not(target_os = "linux"))]
1011        {
1012            tracing::debug!(
1013                "CPU affinity not supported on this platform for context '{}' (cores: {:?})",
1014                ctx_name,
1015                core_ids
1016            );
1017        }
1018    }
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024
1025    #[test]
1026    fn test_context_map_new() {
1027        let map = ContextMap::new();
1028        assert!(!map.has_contexts());
1029        assert!(map.contexts().is_empty());
1030    }
1031
1032    #[test]
1033    fn test_context_map_register() {
1034        let mut map = ContextMap::new();
1035        map.register_context(ContextConfig {
1036            name: "ingestion".to_string(),
1037            cores: Some(vec![0, 1]),
1038        });
1039        assert!(map.has_contexts());
1040        assert_eq!(map.contexts().len(), 1);
1041        let config = map.contexts().get("ingestion").unwrap();
1042        assert_eq!(config.cores, Some(vec![0, 1]));
1043    }
1044
1045    #[test]
1046    fn test_context_map_stream_assignment() {
1047        let mut map = ContextMap::new();
1048        map.register_context(ContextConfig {
1049            name: "fast".to_string(),
1050            cores: None,
1051        });
1052        map.assign_stream("RawEvents".to_string(), "fast".to_string());
1053        assert_eq!(map.stream_context("RawEvents"), Some("fast"));
1054        assert_eq!(map.stream_context("Unknown"), None);
1055    }
1056
1057    #[test]
1058    fn test_context_map_cross_context_emit() {
1059        let mut map = ContextMap::new();
1060        map.register_context(ContextConfig {
1061            name: "analytics".to_string(),
1062            cores: None,
1063        });
1064        map.add_cross_context_emit("Alerts".to_string(), 0, "analytics".to_string());
1065        let emits = map.cross_context_emits();
1066        assert_eq!(
1067            emits.get(&("Alerts".to_string(), 0)),
1068            Some(&"analytics".to_string())
1069        );
1070    }
1071
1072    #[test]
1073    fn test_no_context_backward_compat() {
1074        let map = ContextMap::new();
1075        assert!(!map.has_contexts());
1076    }
1077
1078    #[test]
1079    fn test_context_config_no_cores() {
1080        let config = ContextConfig {
1081            name: "test".to_string(),
1082            cores: None,
1083        };
1084        assert_eq!(config.name, "test");
1085        assert!(config.cores.is_none());
1086    }
1087
1088    #[test]
1089    fn test_context_map_multiple_contexts() {
1090        let mut map = ContextMap::new();
1091        map.register_context(ContextConfig {
1092            name: "ingestion".to_string(),
1093            cores: Some(vec![0, 1]),
1094        });
1095        map.register_context(ContextConfig {
1096            name: "analytics".to_string(),
1097            cores: Some(vec![2, 3]),
1098        });
1099        map.register_context(ContextConfig {
1100            name: "alerts".to_string(),
1101            cores: Some(vec![4]),
1102        });
1103
1104        assert_eq!(map.contexts().len(), 3);
1105
1106        map.assign_stream("RawEvents".to_string(), "ingestion".to_string());
1107        map.assign_stream("Analysis".to_string(), "analytics".to_string());
1108        map.assign_stream("Notifications".to_string(), "alerts".to_string());
1109
1110        assert_eq!(map.stream_context("RawEvents"), Some("ingestion"));
1111        assert_eq!(map.stream_context("Analysis"), Some("analytics"));
1112        assert_eq!(map.stream_context("Notifications"), Some("alerts"));
1113    }
1114
1115    #[test]
1116    fn test_context_orchestrator_event_types_from_source() {
1117        let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1118            "SensorReading".to_string(),
1119        ));
1120        assert_eq!(types, vec!["SensorReading"]);
1121
1122        let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1123            "ProcessedEvents".to_string(),
1124        ));
1125        assert_eq!(types, vec!["ProcessedEvents"]);
1126    }
1127
1128    #[test]
1129    fn test_filter_program_for_context() {
1130        use varpulis_core::span::Spanned;
1131
1132        let program = Program {
1133            statements: vec![
1134                Spanned {
1135                    node: Stmt::ContextDecl {
1136                        name: "ctx1".to_string(),
1137                        cores: None,
1138                    },
1139                    span: varpulis_core::span::Span::dummy(),
1140                },
1141                Spanned {
1142                    node: Stmt::ContextDecl {
1143                        name: "ctx2".to_string(),
1144                        cores: None,
1145                    },
1146                    span: varpulis_core::span::Span::dummy(),
1147                },
1148                Spanned {
1149                    node: Stmt::StreamDecl {
1150                        name: "StreamA".to_string(),
1151                        type_annotation: None,
1152                        source: StreamSource::Ident("EventA".to_string()),
1153                        ops: vec![],
1154                        op_spans: vec![],
1155                    },
1156                    span: varpulis_core::span::Span::dummy(),
1157                },
1158                Spanned {
1159                    node: Stmt::StreamDecl {
1160                        name: "StreamB".to_string(),
1161                        type_annotation: None,
1162                        source: StreamSource::Ident("EventB".to_string()),
1163                        ops: vec![],
1164                        op_spans: vec![],
1165                    },
1166                    span: varpulis_core::span::Span::dummy(),
1167                },
1168            ],
1169        };
1170
1171        let mut context_map = ContextMap::new();
1172        context_map.register_context(ContextConfig {
1173            name: "ctx1".to_string(),
1174            cores: None,
1175        });
1176        context_map.register_context(ContextConfig {
1177            name: "ctx2".to_string(),
1178            cores: None,
1179        });
1180        context_map.assign_stream("StreamA".to_string(), "ctx1".to_string());
1181        context_map.assign_stream("StreamB".to_string(), "ctx2".to_string());
1182
1183        let filtered = filter_program_for_context(&program, "ctx1", &context_map);
1184
1185        let stream_count = filtered
1186            .statements
1187            .iter()
1188            .filter(|s| matches!(s.node, Stmt::StreamDecl { .. }))
1189            .count();
1190        assert_eq!(stream_count, 1, "ctx1 should have exactly 1 stream");
1191
1192        let has_stream_a = filtered
1193            .statements
1194            .iter()
1195            .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamA"));
1196        assert!(has_stream_a, "ctx1 should contain StreamA");
1197
1198        let has_stream_b = filtered
1199            .statements
1200            .iter()
1201            .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamB"));
1202        assert!(!has_stream_b, "ctx1 should NOT contain StreamB");
1203
1204        let context_decl_count = filtered
1205            .statements
1206            .iter()
1207            .filter(|s| matches!(s.node, Stmt::ContextDecl { .. }))
1208            .count();
1209        assert_eq!(
1210            context_decl_count, 2,
1211            "All ContextDecls should be preserved"
1212        );
1213    }
1214
1215    #[test]
1216    fn test_ingress_routing_includes_derived_types() {
1217        use varpulis_core::span::Spanned;
1218
1219        let program = Program {
1220            statements: vec![
1221                Spanned {
1222                    node: Stmt::ContextDecl {
1223                        name: "ingest".to_string(),
1224                        cores: None,
1225                    },
1226                    span: varpulis_core::span::Span::dummy(),
1227                },
1228                Spanned {
1229                    node: Stmt::ContextDecl {
1230                        name: "analytics".to_string(),
1231                        cores: None,
1232                    },
1233                    span: varpulis_core::span::Span::dummy(),
1234                },
1235                Spanned {
1236                    node: Stmt::StreamDecl {
1237                        name: "RawData".to_string(),
1238                        type_annotation: None,
1239                        source: StreamSource::Ident("SensorReading".to_string()),
1240                        ops: vec![],
1241                        op_spans: vec![],
1242                    },
1243                    span: varpulis_core::span::Span::dummy(),
1244                },
1245                Spanned {
1246                    node: Stmt::StreamDecl {
1247                        name: "Analysis".to_string(),
1248                        type_annotation: None,
1249                        source: StreamSource::Ident("RawData".to_string()),
1250                        ops: vec![],
1251                        op_spans: vec![],
1252                    },
1253                    span: varpulis_core::span::Span::dummy(),
1254                },
1255            ],
1256        };
1257
1258        let mut context_map = ContextMap::new();
1259        context_map.register_context(ContextConfig {
1260            name: "ingest".to_string(),
1261            cores: None,
1262        });
1263        context_map.register_context(ContextConfig {
1264            name: "analytics".to_string(),
1265            cores: None,
1266        });
1267        context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1268        context_map.assign_stream("Analysis".to_string(), "analytics".to_string());
1269
1270        let (output_tx, _output_rx) = mpsc::channel(10);
1271        let orchestrator =
1272            ContextOrchestrator::build(&context_map, &program, output_tx, 100).unwrap();
1273
1274        let routing = orchestrator.ingress_routing();
1275
1276        assert_eq!(routing.get("SensorReading"), Some(&"ingest".to_string()));
1277        assert_eq!(routing.get("RawData"), Some(&"analytics".to_string()));
1278
1279        orchestrator.shutdown();
1280    }
1281
1282    #[test]
1283    fn test_ingress_routing_cross_context_emits() {
1284        let mut context_map = ContextMap::new();
1285        context_map.register_context(ContextConfig {
1286            name: "ingest".to_string(),
1287            cores: None,
1288        });
1289        context_map.register_context(ContextConfig {
1290            name: "analytics".to_string(),
1291            cores: None,
1292        });
1293        context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1294        context_map.add_cross_context_emit("RawData".to_string(), 0, "analytics".to_string());
1295
1296        let emits = context_map.cross_context_emits();
1297        assert_eq!(
1298            emits.get(&("RawData".to_string(), 0)),
1299            Some(&"analytics".to_string())
1300        );
1301    }
1302
1303    #[test]
1304    #[cfg(target_os = "linux")]
1305    fn test_cpu_affinity_verification() {
1306        let cores = verify_cpu_affinity();
1307        assert!(cores.is_some(), "Should be able to read CPU affinity");
1308        let cores = cores.unwrap();
1309        assert!(!cores.is_empty(), "Should have at least one allowed core");
1310    }
1311}