Skip to main content

varpulis_runtime/
context.rs

1//! Context-based multi-threaded execution architecture.
2//!
3//! Named contexts provide isolated execution domains. Each context runs on its own
4//! OS thread with a single-threaded Tokio runtime, enabling true parallelism without
5//! locks within a context. Cross-context communication uses bounded `mpsc` channels.
6//!
7//! When no contexts are declared, the engine runs in single-threaded mode with zero
8//! overhead (backward compatible).
9
10use crate::engine::Engine;
11use crate::event::{Event, SharedEvent};
12use crate::persistence::{CheckpointConfig, CheckpointManager, EngineCheckpoint, StoreError};
13use rustc_hash::FxHashMap;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Instant;
17use tokio::sync::{mpsc, watch};
18use tracing::{error, info, warn};
19use varpulis_core::ast::{Program, Stmt, StreamSource};
20
21/// Messages sent through context channels.
22///
23/// Wraps either a regular event or a checkpoint barrier for exactly-once semantics.
24#[derive(Debug, Clone)]
25pub enum ContextMessage {
26    /// A regular event to process
27    Event(SharedEvent),
28    /// A checkpoint barrier — triggers state snapshot
29    CheckpointBarrier(CheckpointBarrier),
30    /// Watermark update from an upstream context
31    WatermarkUpdate {
32        source_context: String,
33        watermark_ms: i64,
34    },
35}
36
37/// A checkpoint barrier flowing through the context DAG.
38#[derive(Debug, Clone)]
39pub struct CheckpointBarrier {
40    pub checkpoint_id: u64,
41    pub timestamp_ms: i64,
42}
43
44/// Acknowledgment from a context after completing a checkpoint.
45#[derive(Debug)]
46pub struct CheckpointAck {
47    pub context_name: String,
48    pub checkpoint_id: u64,
49    pub engine_checkpoint: EngineCheckpoint,
50}
51
52/// Tracks a pending coordinated checkpoint across all contexts.
53struct PendingCheckpoint {
54    checkpoint_id: u64,
55    timestamp_ms: i64,
56    acks: HashMap<String, EngineCheckpoint>,
57    started_at: Instant,
58}
59
60/// Coordinates checkpoints across multiple contexts.
61///
62/// Sends `CheckpointBarrier` to all contexts, collects `CheckpointAck` responses,
63/// and persists the assembled `Checkpoint` once all contexts have acknowledged.
64pub struct CheckpointCoordinator {
65    manager: CheckpointManager,
66    ack_tx: mpsc::Sender<CheckpointAck>,
67    ack_rx: mpsc::Receiver<CheckpointAck>,
68    context_names: Vec<String>,
69    pending: Option<PendingCheckpoint>,
70    next_checkpoint_id: u64,
71}
72
73impl std::fmt::Debug for CheckpointCoordinator {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("CheckpointCoordinator")
76            .field("context_names", &self.context_names)
77            .field("next_checkpoint_id", &self.next_checkpoint_id)
78            .field("has_pending", &self.pending.is_some())
79            .finish_non_exhaustive()
80    }
81}
82
83impl CheckpointCoordinator {
84    /// Create a new coordinator for the given contexts.
85    pub fn new(manager: CheckpointManager, context_names: Vec<String>) -> Self {
86        let (ack_tx, ack_rx) = mpsc::channel(context_names.len() * 2);
87        Self {
88            manager,
89            ack_tx,
90            ack_rx,
91            context_names,
92            pending: None,
93            next_checkpoint_id: 1,
94        }
95    }
96
97    /// Get a sender for checkpoint acknowledgments (cloned into each context).
98    pub fn ack_sender(&self) -> mpsc::Sender<CheckpointAck> {
99        self.ack_tx.clone()
100    }
101
102    /// Initiate a new checkpoint by sending barriers to all contexts.
103    pub fn initiate(&mut self, context_txs: &FxHashMap<String, mpsc::Sender<ContextMessage>>) {
104        if self.pending.is_some() {
105            warn!("Checkpoint already in progress, skipping initiation");
106            return;
107        }
108
109        let checkpoint_id = self.next_checkpoint_id;
110        self.next_checkpoint_id += 1;
111        let timestamp_ms = chrono::Utc::now().timestamp_millis();
112
113        let barrier = CheckpointBarrier {
114            checkpoint_id,
115            timestamp_ms,
116        };
117
118        for (ctx_name, tx) in context_txs {
119            if let Err(e) = tx.try_send(ContextMessage::CheckpointBarrier(barrier.clone())) {
120                error!(
121                    "Failed to send checkpoint barrier to context '{}': {}",
122                    ctx_name, e
123                );
124            }
125        }
126
127        self.pending = Some(PendingCheckpoint {
128            checkpoint_id,
129            timestamp_ms,
130            acks: HashMap::new(),
131            started_at: Instant::now(),
132        });
133
134        info!("Initiated checkpoint {}", checkpoint_id);
135    }
136
137    /// Receive an acknowledgment. Returns a complete `Checkpoint` when all contexts have acked.
138    pub fn receive_ack(&mut self, ack: CheckpointAck) -> Option<crate::persistence::Checkpoint> {
139        let pending = self.pending.as_mut()?;
140
141        if ack.checkpoint_id != pending.checkpoint_id {
142            warn!(
143                "Received ack for checkpoint {} but expecting {}",
144                ack.checkpoint_id, pending.checkpoint_id
145            );
146            return None;
147        }
148
149        pending.acks.insert(ack.context_name, ack.engine_checkpoint);
150
151        if pending.acks.len() == self.context_names.len() {
152            let pending = self.pending.take().unwrap();
153            let mut context_states = HashMap::new();
154            for (name, cp) in pending.acks {
155                context_states.insert(name, cp);
156            }
157
158            Some(crate::persistence::Checkpoint {
159                id: pending.checkpoint_id,
160                timestamp_ms: pending.timestamp_ms,
161                events_processed: 0, // Filled from context states
162                window_states: HashMap::new(),
163                pattern_states: HashMap::new(),
164                metadata: HashMap::new(),
165                context_states,
166            })
167        } else {
168            None
169        }
170    }
171
172    /// Try to drain pending acks and complete the checkpoint.
173    pub fn try_complete(&mut self) -> Result<(), StoreError> {
174        while let Ok(ack) = self.ack_rx.try_recv() {
175            if let Some(checkpoint) = self.receive_ack(ack) {
176                self.manager.checkpoint(checkpoint)?;
177                return Ok(());
178            }
179        }
180
181        // Warn if a checkpoint has been pending for too long (> 30s)
182        if let Some(ref pending) = self.pending {
183            if pending.started_at.elapsed() > std::time::Duration::from_secs(30) {
184                warn!(
185                    "Checkpoint {} has been pending for {:.1}s — contexts may be blocked",
186                    pending.checkpoint_id,
187                    pending.started_at.elapsed().as_secs_f64()
188                );
189            }
190        }
191
192        Ok(())
193    }
194
195    /// Check if a checkpoint should be initiated based on interval.
196    pub fn should_checkpoint(&self) -> bool {
197        self.pending.is_none() && self.manager.should_checkpoint()
198    }
199
200    /// Whether a checkpoint is currently in progress (waiting for acks).
201    pub const fn has_pending(&self) -> bool {
202        self.pending.is_some()
203    }
204}
205
206/// Configuration for a named context
207#[derive(Debug, Clone)]
208pub struct ContextConfig {
209    pub name: String,
210    pub cores: Option<Vec<usize>>,
211}
212
213/// Maps streams/connectors to their assigned context.
214///
215/// Built during `Engine::load()` by processing `ContextDecl` statements
216/// and `StreamOp::Context` / `Emit { target_context }` operations.
217#[derive(Debug, Clone, Default)]
218pub struct ContextMap {
219    /// context_name -> config
220    contexts: HashMap<String, ContextConfig>,
221    /// stream_name -> context_name
222    stream_assignments: HashMap<String, String>,
223    /// (stream_name, emit_index) -> target_context for cross-context emits
224    cross_context_emits: HashMap<(String, usize), String>,
225}
226
227impl ContextMap {
228    pub fn new() -> Self {
229        Self::default()
230    }
231
232    /// Register a context declaration
233    pub fn register_context(&mut self, config: ContextConfig) {
234        self.contexts.insert(config.name.clone(), config);
235    }
236
237    /// Assign a stream to a context
238    pub fn assign_stream(&mut self, stream_name: String, context_name: String) {
239        self.stream_assignments.insert(stream_name, context_name);
240    }
241
242    /// Record a cross-context emit
243    pub fn add_cross_context_emit(
244        &mut self,
245        stream_name: String,
246        emit_index: usize,
247        target_context: String,
248    ) {
249        self.cross_context_emits
250            .insert((stream_name, emit_index), target_context);
251    }
252
253    /// Check if any contexts have been declared
254    pub fn has_contexts(&self) -> bool {
255        !self.contexts.is_empty()
256    }
257
258    /// Get all declared contexts
259    pub const fn contexts(&self) -> &HashMap<String, ContextConfig> {
260        &self.contexts
261    }
262
263    /// Get the context assignment for a stream
264    pub fn stream_context(&self, stream_name: &str) -> Option<&str> {
265        self.stream_assignments.get(stream_name).map(|s| s.as_str())
266    }
267
268    /// Get all stream assignments
269    pub const fn stream_assignments(&self) -> &HashMap<String, String> {
270        &self.stream_assignments
271    }
272
273    /// Get all cross-context emits
274    pub const fn cross_context_emits(&self) -> &HashMap<(String, usize), String> {
275        &self.cross_context_emits
276    }
277}
278
279/// Filter a program to keep only the streams assigned to a specific context.
280///
281/// Retains all non-stream statements (ContextDecl, ConnectorDecl, VarDecl,
282/// Assignment, FnDecl, EventDecl, PatternDecl, Config) since they may be
283/// needed by any context. Only `StreamDecl` statements are filtered based
284/// on context assignment.
285pub fn filter_program_for_context(
286    program: &Program,
287    context_name: &str,
288    context_map: &ContextMap,
289) -> Program {
290    let filtered_statements = program
291        .statements
292        .iter()
293        .filter(|stmt| {
294            if let Stmt::StreamDecl { name, .. } = &stmt.node {
295                // Keep the stream only if it's assigned to this context
296                match context_map.stream_context(name) {
297                    Some(ctx) => ctx == context_name,
298                    // Unassigned streams are kept in all contexts for backward compat
299                    None => true,
300                }
301            } else {
302                // Keep all non-stream statements
303                true
304            }
305        })
306        .cloned()
307        .collect();
308
309    Program {
310        statements: filtered_statements,
311    }
312}
313
314/// Verify the CPU affinity of the current thread by reading /proc/self/status.
315///
316/// Returns the list of CPU cores the current thread is allowed to run on,
317/// or `None` if the information cannot be read.
318#[cfg(target_os = "linux")]
319pub fn verify_cpu_affinity() -> Option<Vec<usize>> {
320    use std::fs;
321
322    let status = fs::read_to_string("/proc/self/status").ok()?;
323    for line in status.lines() {
324        if line.starts_with("Cpus_allowed_list:") {
325            let list_str = line.split(':').nth(1)?.trim();
326            let mut cores = Vec::new();
327            for part in list_str.split(',') {
328                let part = part.trim();
329                if let Some((start, end)) = part.split_once('-') {
330                    if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
331                        cores.extend(s..=e);
332                    }
333                } else if let Ok(core) = part.parse::<usize>() {
334                    cores.push(core);
335                }
336            }
337            return Some(cores);
338        }
339    }
340    None
341}
342
343/// A self-contained single-threaded runtime for one context.
344///
345/// Owns its streams, processes events without locks. Receives events from
346/// its inbound channel and forwards cross-context events via outbound channels.
347pub struct ContextRuntime {
348    name: String,
349    engine: Engine,
350    /// Main output channel (tenant/CLI)
351    output_tx: mpsc::Sender<Event>,
352    /// Inbound messages from orchestrator (events + barriers)
353    event_rx: mpsc::Receiver<ContextMessage>,
354    /// Engine's emitted events receiver
355    engine_output_rx: mpsc::Receiver<Event>,
356    /// Senders to all contexts (including self, for intra-context derived streams)
357    all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
358    /// event_type → context_name routing table
359    ingress_routing: FxHashMap<String, String>,
360    /// Shutdown signal receiver
361    shutdown_rx: watch::Receiver<bool>,
362    /// Checkpoint ack sender (if coordinated checkpointing is enabled)
363    ack_tx: Option<mpsc::Sender<CheckpointAck>>,
364    events_processed: u64,
365    output_events_emitted: u64,
366}
367
368impl std::fmt::Debug for ContextRuntime {
369    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370        f.debug_struct("ContextRuntime")
371            .field("name", &self.name)
372            .field("ingress_routing", &self.ingress_routing)
373            .field("events_processed", &self.events_processed)
374            .field("output_events_emitted", &self.output_events_emitted)
375            .finish_non_exhaustive()
376    }
377}
378
379impl ContextRuntime {
380    /// Create a new context runtime
381    #[allow(clippy::too_many_arguments)]
382    pub const fn new(
383        name: String,
384        engine: Engine,
385        output_tx: mpsc::Sender<Event>,
386        event_rx: mpsc::Receiver<ContextMessage>,
387        engine_output_rx: mpsc::Receiver<Event>,
388        all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
389        ingress_routing: FxHashMap<String, String>,
390        shutdown_rx: watch::Receiver<bool>,
391    ) -> Self {
392        Self {
393            name,
394            engine,
395            output_tx,
396            event_rx,
397            engine_output_rx,
398            all_context_txs,
399            ingress_routing,
400            shutdown_rx,
401            ack_tx: None,
402            events_processed: 0,
403            output_events_emitted: 0,
404        }
405    }
406
407    /// Set the checkpoint acknowledgment sender for coordinated checkpointing.
408    pub fn with_ack_sender(mut self, ack_tx: mpsc::Sender<CheckpointAck>) -> Self {
409        self.ack_tx = Some(ack_tx);
410        self
411    }
412
413    /// Drain engine output events and route them to consuming contexts
414    /// and the main output channel.
415    fn drain_and_route_output(&mut self) {
416        while let Ok(output_event) = self.engine_output_rx.try_recv() {
417            self.output_events_emitted += 1;
418
419            // Route to consuming context if any
420            if let Some(target_ctx) = self.ingress_routing.get(&*output_event.event_type) {
421                if let Some(tx) = self.all_context_txs.get(target_ctx) {
422                    let shared = Arc::new(output_event);
423                    let _ = tx.try_send(ContextMessage::Event(Arc::clone(&shared)));
424                    // Unwrap the Arc for the output channel (we hold the only other ref)
425                    let owned = Arc::try_unwrap(shared).unwrap_or_else(|arc| (*arc).clone());
426                    let _ = self.output_tx.try_send(owned);
427                    continue;
428                }
429            }
430
431            // Always forward to main output channel
432            let _ = self.output_tx.try_send(output_event);
433        }
434    }
435
436    /// Handle a checkpoint barrier by snapshotting engine state and sending ack.
437    async fn handle_checkpoint_barrier(&self, barrier: CheckpointBarrier) {
438        if let Some(ref ack_tx) = self.ack_tx {
439            let checkpoint = self.engine.create_checkpoint();
440            let _ = ack_tx
441                .send(CheckpointAck {
442                    context_name: self.name.clone(),
443                    checkpoint_id: barrier.checkpoint_id,
444                    engine_checkpoint: checkpoint,
445                })
446                .await;
447        }
448    }
449
450    /// Run the event loop. Blocks the current thread.
451    ///
452    /// Receives events from the inbound channel, processes them through
453    /// the engine, and forwards cross-context events as needed.
454    ///
455    /// If the engine has session windows, a periodic sweep timer runs
456    /// every `gap` duration to close stale sessions. This ensures sessions
457    /// are emitted even when no new events arrive.
458    pub async fn run(&mut self) {
459        info!("Context '{}' runtime started", self.name);
460
461        #[cfg(target_os = "linux")]
462        if let Some(cores) = verify_cpu_affinity() {
463            info!("Context '{}' running on cores {:?}", self.name, cores);
464        }
465
466        // Compute sweep interval from engine's session window gaps
467        let has_sessions = self.engine.has_session_windows();
468        let sweep_interval = self
469            .engine
470            .min_session_gap()
471            .and_then(|d| d.to_std().ok())
472            .unwrap_or(std::time::Duration::from_secs(60));
473
474        let mut sweep_timer = tokio::time::interval(sweep_interval);
475        sweep_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
476        // Skip the first immediate tick
477        sweep_timer.tick().await;
478
479        loop {
480            tokio::select! {
481                biased;
482
483                _ = self.shutdown_rx.changed() => {
484                    if *self.shutdown_rx.borrow() {
485                        info!("Context '{}' received shutdown signal", self.name);
486                        // On shutdown: flush all remaining sessions
487                        if has_sessions {
488                            if let Err(e) = self.engine.flush_expired_sessions().await {
489                                error!("Context '{}' shutdown session flush error: {}", self.name, e);
490                            }
491                            self.drain_and_route_output();
492                        }
493                        break;
494                    }
495                }
496
497                _ = sweep_timer.tick(), if has_sessions => {
498                    match self.engine.flush_expired_sessions().await {
499                        Ok(()) => {}
500                        Err(e) => {
501                            error!("Context '{}' session sweep error: {}", self.name, e);
502                        }
503                    }
504                    self.drain_and_route_output();
505                }
506
507                msg = self.event_rx.recv() => {
508                    match msg {
509                        Some(ContextMessage::Event(event)) => {
510                            self.events_processed += 1;
511
512                            // Process the event through the engine (zero-copy via SharedEvent)
513                            match self.engine.process_shared(Arc::clone(&event)).await {
514                                Ok(()) => {}
515                                Err(e) => {
516                                    error!("Context '{}' processing error: {}", self.name, e);
517                                }
518                            }
519
520                            self.drain_and_route_output();
521                        }
522                        Some(ContextMessage::CheckpointBarrier(barrier)) => {
523                            self.handle_checkpoint_barrier(barrier).await;
524                        }
525                        Some(ContextMessage::WatermarkUpdate { source_context, watermark_ms }) => {
526                            // Feed watermark into engine's tracker (Phase 2E)
527                            let _ = self.engine.advance_external_watermark(&source_context, watermark_ms).await;
528                        }
529                        None => {
530                            // Channel closed
531                            break;
532                        }
533                    }
534                }
535            }
536        }
537
538        // Drop cross-context senders so other contexts can shut down too
539        self.all_context_txs.clear();
540
541        info!(
542            "Context '{}' runtime stopped (processed {} events, emitted {} output events)",
543            self.name, self.events_processed, self.output_events_emitted
544        );
545    }
546}
547
548/// Direct event-type-to-channel router for non-blocking dispatch.
549///
550/// Maps `event_type → Sender<ContextMessage>` directly (single HashMap lookup),
551/// uses `try_send()` for non-blocking dispatch, and is cheaply cloneable
552/// via `Arc<HashMap>` for multi-producer scenarios.
553#[derive(Clone)]
554pub struct EventTypeRouter {
555    routes: Arc<FxHashMap<String, mpsc::Sender<ContextMessage>>>,
556    default_tx: mpsc::Sender<ContextMessage>,
557}
558
559impl std::fmt::Debug for EventTypeRouter {
560    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561        f.debug_struct("EventTypeRouter")
562            .field("route_count", &self.routes.len())
563            .field("route_keys", &self.routes.keys().collect::<Vec<_>>())
564            .finish_non_exhaustive()
565    }
566}
567
568/// Errors returned by non-blocking dispatch methods.
569#[derive(Debug)]
570pub enum DispatchError {
571    /// Channel is full — caller should retry or use async dispatch
572    ChannelFull(ContextMessage),
573    /// Channel is closed — context has shut down
574    ChannelClosed(ContextMessage),
575}
576
577impl EventTypeRouter {
578    /// Non-blocking dispatch via `try_send()`.
579    ///
580    /// Routes the event to the correct context channel based on event type.
581    /// Returns immediately without waiting for channel capacity.
582    pub fn dispatch(&self, event: SharedEvent) -> Result<(), DispatchError> {
583        let tx = self
584            .routes
585            .get(&*event.event_type)
586            .unwrap_or(&self.default_tx);
587        let msg = ContextMessage::Event(event);
588        match tx.try_send(msg) {
589            Ok(()) => Ok(()),
590            Err(mpsc::error::TrySendError::Full(msg)) => Err(DispatchError::ChannelFull(msg)),
591            Err(mpsc::error::TrySendError::Closed(msg)) => Err(DispatchError::ChannelClosed(msg)),
592        }
593    }
594
595    /// Blocking dispatch via `send().await`.
596    ///
597    /// Waits for channel capacity if the channel is full.
598    pub async fn dispatch_await(&self, event: SharedEvent) -> Result<(), String> {
599        let event_type = event.event_type.clone();
600        let tx = self.routes.get(&*event_type).unwrap_or(&self.default_tx);
601        tx.send(ContextMessage::Event(event))
602            .await
603            .map_err(|e| format!("Failed to send event type '{event_type}': {e}"))
604    }
605
606    /// Batch dispatch — non-blocking, returns errors for any events that could not be sent.
607    pub fn dispatch_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
608        let mut errors = Vec::new();
609        for event in events {
610            if let Err(e) = self.dispatch(event) {
611                errors.push(e);
612            }
613        }
614        errors
615    }
616}
617
618/// Orchestrates multiple ContextRuntimes across OS threads.
619///
620/// Routes incoming events to the correct context based on event type
621/// and stream assignments.
622pub struct ContextOrchestrator {
623    /// Senders to each context's event channel
624    context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
625    /// Thread handles for each context
626    handles: Vec<std::thread::JoinHandle<()>>,
627    /// event_type -> context_name routing table
628    ingress_routing: FxHashMap<String, String>,
629    /// Shutdown signal sender
630    shutdown_tx: watch::Sender<bool>,
631    /// Direct event-type-to-channel router
632    router: EventTypeRouter,
633    /// Optional checkpoint coordinator for exactly-once semantics
634    checkpoint_coordinator: Option<CheckpointCoordinator>,
635}
636
637impl std::fmt::Debug for ContextOrchestrator {
638    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639        f.debug_struct("ContextOrchestrator")
640            .field("context_count", &self.context_txs.len())
641            .field("ingress_routing", &self.ingress_routing)
642            .field("handle_count", &self.handles.len())
643            .finish_non_exhaustive()
644    }
645}
646
647impl ContextOrchestrator {
648    /// Build the orchestrator from engine state.
649    ///
650    /// For each declared context:
651    /// 1. Creates a bounded mpsc channel
652    /// 2. Creates an Engine with only the streams assigned to that context
653    /// 3. Spawns an OS thread with optional CPU affinity
654    /// 4. Inside the thread: creates a single-threaded Tokio runtime
655    ///    and runs the ContextRuntime event loop
656    pub fn build(
657        context_map: &ContextMap,
658        program: &Program,
659        output_tx: mpsc::Sender<Event>,
660        channel_capacity: usize,
661    ) -> Result<Self, String> {
662        Self::build_with_checkpoint(
663            context_map,
664            program,
665            output_tx,
666            channel_capacity,
667            None,
668            None,
669        )
670    }
671
672    /// Build the orchestrator with optional checkpoint configuration and recovery state.
673    pub fn build_with_checkpoint(
674        context_map: &ContextMap,
675        program: &Program,
676        output_tx: mpsc::Sender<Event>,
677        channel_capacity: usize,
678        checkpoint_config: Option<(CheckpointConfig, Arc<dyn crate::persistence::StateStore>)>,
679        recovery_checkpoint: Option<&crate::persistence::Checkpoint>,
680    ) -> Result<Self, String> {
681        let mut context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = FxHashMap::default();
682        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
683
684        // Create shutdown signal
685        let (shutdown_tx, _shutdown_rx) = watch::channel(false);
686
687        // Determine default context
688        let default_context = context_map
689            .contexts()
690            .keys()
691            .next()
692            .cloned()
693            .unwrap_or_else(|| "default".to_string());
694
695        // Create cross-context senders: first pass to create all channels
696        let mut context_rxs: FxHashMap<String, mpsc::Receiver<ContextMessage>> =
697            FxHashMap::default();
698        for ctx_name in context_map.contexts().keys() {
699            let (tx, rx) = mpsc::channel(channel_capacity);
700            context_txs.insert(ctx_name.clone(), tx);
701            context_rxs.insert(ctx_name.clone(), rx);
702        }
703
704        // Set up checkpoint coordinator if configured
705        let context_names: Vec<String> = context_map.contexts().keys().cloned().collect();
706        let checkpoint_coordinator = checkpoint_config.map(|(config, store)| {
707            let manager = CheckpointManager::new(store, config)
708                .map_err(|e| format!("Failed to create checkpoint manager: {e}"))
709                .unwrap();
710            CheckpointCoordinator::new(manager, context_names.clone())
711        });
712        let ack_tx = checkpoint_coordinator.as_ref().map(|c| c.ack_sender());
713
714        // Build ingress routing: event_type -> context_name
715        let mut ingress_routing: FxHashMap<String, String> = FxHashMap::default();
716
717        // First pass: route raw event types from stream sources to contexts
718        for stmt in &program.statements {
719            if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
720                if let Some(ctx_name) = context_map.stream_context(name) {
721                    let event_types = Self::event_types_from_source(source);
722                    for et in event_types {
723                        ingress_routing.insert(et, ctx_name.to_string());
724                    }
725                }
726            }
727        }
728
729        // Second pass: route derived stream output types to consuming contexts
730        for stmt in &program.statements {
731            if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
732                if let Some(ctx_name) = context_map.stream_context(name) {
733                    if let StreamSource::Ident(source_stream) = source {
734                        if context_map.stream_context(source_stream).is_some() {
735                            ingress_routing.insert(source_stream.clone(), ctx_name.to_string());
736                        }
737                    }
738                }
739            }
740        }
741
742        // Third pass: validate cross-context emit targets
743        for ((_stream_name, _emit_idx), target_ctx) in context_map.cross_context_emits() {
744            if !context_txs.contains_key(target_ctx) {
745                warn!(
746                    "Cross-context emit targets unknown context '{}'",
747                    target_ctx
748                );
749            }
750        }
751
752        // Build EventTypeRouter: event_type → Sender directly (single lookup)
753        let mut event_type_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> =
754            FxHashMap::default();
755        for (event_type, ctx_name) in &ingress_routing {
756            if let Some(tx) = context_txs.get(ctx_name) {
757                event_type_txs.insert(event_type.clone(), tx.clone());
758            }
759        }
760        let default_tx = context_txs
761            .get(&default_context)
762            .cloned()
763            .ok_or_else(|| format!("No channel for default context '{default_context}'"))?;
764        let router = EventTypeRouter {
765            routes: Arc::new(event_type_txs),
766            default_tx,
767        };
768
769        // Clone context_map for use inside thread spawning
770        let context_map_clone = context_map.clone();
771
772        // Clone recovery state per context
773        let recovery_states: HashMap<String, EngineCheckpoint> = recovery_checkpoint
774            .map(|cp| cp.context_states.clone())
775            .unwrap_or_default();
776
777        // Spawn a thread for each context
778        for (ctx_name, config) in context_map.contexts() {
779            let rx = context_rxs
780                .remove(ctx_name)
781                .ok_or_else(|| format!("No receiver for context {ctx_name}"))?;
782
783            let ctx_output_tx = output_tx.clone();
784            let ctx_name_clone = ctx_name.clone();
785            let cores = config.cores.clone();
786
787            // Clone all context senders for cross-context forwarding
788            let all_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = context_txs
789                .iter()
790                .map(|(k, v)| (k.clone(), v.clone()))
791                .collect();
792
793            // Filter the program to only include this context's streams
794            let filtered_program =
795                filter_program_for_context(program, ctx_name, &context_map_clone);
796            let ingress_routing_clone = ingress_routing.clone();
797            let shutdown_rx = shutdown_tx.subscribe();
798            let ctx_ack_tx = ack_tx.clone();
799            let ctx_recovery = recovery_states.get(ctx_name).cloned();
800
801            let handle = std::thread::Builder::new()
802                .name(format!("varpulis-ctx-{ctx_name}"))
803                .spawn(move || {
804                    // Set CPU affinity if specified
805                    if let Some(ref core_ids) = cores {
806                        Self::set_cpu_affinity(&ctx_name_clone, core_ids);
807                    }
808
809                    // Create a single-threaded Tokio runtime for this context
810                    let rt = tokio::runtime::Builder::new_current_thread()
811                        .enable_all()
812                        .build()
813                        .expect("Failed to create Tokio runtime for context");
814
815                    rt.block_on(async move {
816                        // Create engine for this context with filtered program
817                        let (engine_output_tx, engine_output_rx) = mpsc::channel(1000);
818                        let mut engine = Engine::new(engine_output_tx);
819                        engine.set_context_name(&ctx_name_clone);
820                        if let Err(e) = engine.load(&filtered_program) {
821                            error!(
822                                "Failed to load program for context '{}': {}",
823                                ctx_name_clone, e
824                            );
825                            return;
826                        }
827
828                        // Connect sinks (MQTT, Kafka, etc.) after load
829                        if let Err(e) = engine.connect_sinks().await {
830                            error!(
831                                "Failed to connect sinks for context '{}': {}",
832                                ctx_name_clone, e
833                            );
834                            return;
835                        }
836
837                        // Restore from checkpoint if available
838                        if let Some(cp) = ctx_recovery {
839                            if let Err(e) = engine.restore_checkpoint(&cp) {
840                                tracing::error!(
841                                    "Context {} failed to restore checkpoint: {}",
842                                    ctx_name_clone,
843                                    e
844                                );
845                                return;
846                            }
847                        }
848
849                        let mut ctx_runtime = ContextRuntime::new(
850                            ctx_name_clone,
851                            engine,
852                            ctx_output_tx,
853                            rx,
854                            engine_output_rx,
855                            all_txs,
856                            ingress_routing_clone,
857                            shutdown_rx,
858                        );
859
860                        if let Some(ack_tx) = ctx_ack_tx {
861                            ctx_runtime = ctx_runtime.with_ack_sender(ack_tx);
862                        }
863
864                        ctx_runtime.run().await;
865                    });
866                })
867                .map_err(|e| format!("Failed to spawn context thread: {e}"))?;
868
869            handles.push(handle);
870        }
871
872        Ok(Self {
873            context_txs,
874            handles,
875            ingress_routing,
876            shutdown_tx,
877            router,
878            checkpoint_coordinator,
879        })
880    }
881
882    /// Route an incoming event to the correct context (async, waits on backpressure).
883    pub async fn process(&self, event: SharedEvent) -> Result<(), String> {
884        self.router.dispatch_await(event).await
885    }
886
887    /// Non-blocking dispatch — returns `ChannelFull` instead of waiting.
888    pub fn try_process(&self, event: SharedEvent) -> Result<(), DispatchError> {
889        self.router.dispatch(event)
890    }
891
892    /// Batch dispatch — non-blocking, returns errors for events that could not be sent.
893    pub fn process_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
894        self.router.dispatch_batch(events)
895    }
896
897    /// Get a cloneable router handle for direct multi-producer dispatch.
898    pub fn router(&self) -> EventTypeRouter {
899        self.router.clone()
900    }
901
902    /// Shut down all context threads.
903    ///
904    /// Sends shutdown signal, drops senders, and waits for threads to finish.
905    pub fn shutdown(self) {
906        // Signal all contexts to shut down
907        let _ = self.shutdown_tx.send(true);
908
909        // Drop all senders to unblock any recv() calls
910        drop(self.context_txs);
911
912        // Wait for all threads to finish
913        for handle in self.handles {
914            if let Err(e) = handle.join() {
915                error!("Context thread panicked: {:?}", e);
916            }
917        }
918
919        info!("All context runtimes shut down");
920    }
921
922    /// Trigger a checkpoint across all contexts.
923    ///
924    /// Sends a `CheckpointBarrier` to every context. Each context will snapshot
925    /// its engine state and send a `CheckpointAck` back. Call `try_complete_checkpoint()`
926    /// afterwards to drain acks and persist.
927    pub fn trigger_checkpoint(&mut self) {
928        if let Some(ref mut coordinator) = self.checkpoint_coordinator {
929            coordinator.initiate(&self.context_txs);
930        }
931    }
932
933    /// Try to complete a pending checkpoint by draining acknowledgments.
934    ///
935    /// Returns `true` if a checkpoint was fully completed and persisted.
936    pub fn try_complete_checkpoint(&mut self) -> Result<bool, StoreError> {
937        if let Some(ref mut coordinator) = self.checkpoint_coordinator {
938            let had_pending = coordinator.has_pending();
939            coordinator.try_complete()?;
940            // If we had a pending checkpoint and now it's gone, we completed it
941            Ok(had_pending && !coordinator.has_pending())
942        } else {
943            Ok(false)
944        }
945    }
946
947    /// Check if a periodic checkpoint should be triggered (based on configured interval).
948    pub fn should_checkpoint(&self) -> bool {
949        self.checkpoint_coordinator
950            .as_ref()
951            .is_some_and(|c| c.should_checkpoint())
952    }
953
954    /// Run one checkpoint cycle: trigger if due, then try to complete.
955    ///
956    /// Call this periodically from the main event loop (e.g., every second or on a timer).
957    pub fn checkpoint_tick(&mut self) -> Result<bool, StoreError> {
958        if self.should_checkpoint() {
959            self.trigger_checkpoint();
960        }
961        self.try_complete_checkpoint()
962    }
963
964    /// Get the names of all running contexts
965    pub fn context_names(&self) -> Vec<&str> {
966        self.context_txs.keys().map(|s| s.as_str()).collect()
967    }
968
969    /// Get the ingress routing table (for testing/debugging)
970    pub const fn ingress_routing(&self) -> &FxHashMap<String, String> {
971        &self.ingress_routing
972    }
973
974    /// Extract event types consumed by a stream source
975    fn event_types_from_source(source: &StreamSource) -> Vec<String> {
976        match source {
977            StreamSource::Ident(name) => vec![name.clone()],
978            StreamSource::IdentWithAlias { name, .. } => vec![name.clone()],
979            StreamSource::AllWithAlias { name, .. } => vec![name.clone()],
980            StreamSource::FromConnector { event_type, .. } => vec![event_type.clone()],
981            StreamSource::Merge(decls) => decls.iter().map(|d| d.source.clone()).collect(),
982            StreamSource::Join(clauses) => clauses.iter().map(|c| c.source.clone()).collect(),
983            StreamSource::Sequence(decl) => {
984                decl.steps.iter().map(|s| s.event_type.clone()).collect()
985            }
986            StreamSource::Timer(_) => vec![],
987        }
988    }
989
990    /// Set CPU affinity for the current thread
991    fn set_cpu_affinity(ctx_name: &str, core_ids: &[usize]) {
992        #[cfg(target_os = "linux")]
993        {
994            use core_affinity::CoreId;
995            if let Some(&first_core) = core_ids.first() {
996                let core_id = CoreId { id: first_core };
997                if core_affinity::set_for_current(core_id) {
998                    info!("Context '{}' pinned to core {}", ctx_name, first_core);
999                } else {
1000                    warn!(
1001                        "Failed to pin context '{}' to core {}",
1002                        ctx_name, first_core
1003                    );
1004                }
1005            }
1006        }
1007
1008        #[cfg(not(target_os = "linux"))]
1009        {
1010            tracing::debug!(
1011                "CPU affinity not supported on this platform for context '{}' (cores: {:?})",
1012                ctx_name,
1013                core_ids
1014            );
1015        }
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022
1023    #[test]
1024    fn test_context_map_new() {
1025        let map = ContextMap::new();
1026        assert!(!map.has_contexts());
1027        assert!(map.contexts().is_empty());
1028    }
1029
1030    #[test]
1031    fn test_context_map_register() {
1032        let mut map = ContextMap::new();
1033        map.register_context(ContextConfig {
1034            name: "ingestion".to_string(),
1035            cores: Some(vec![0, 1]),
1036        });
1037        assert!(map.has_contexts());
1038        assert_eq!(map.contexts().len(), 1);
1039        let config = map.contexts().get("ingestion").unwrap();
1040        assert_eq!(config.cores, Some(vec![0, 1]));
1041    }
1042
1043    #[test]
1044    fn test_context_map_stream_assignment() {
1045        let mut map = ContextMap::new();
1046        map.register_context(ContextConfig {
1047            name: "fast".to_string(),
1048            cores: None,
1049        });
1050        map.assign_stream("RawEvents".to_string(), "fast".to_string());
1051        assert_eq!(map.stream_context("RawEvents"), Some("fast"));
1052        assert_eq!(map.stream_context("Unknown"), None);
1053    }
1054
1055    #[test]
1056    fn test_context_map_cross_context_emit() {
1057        let mut map = ContextMap::new();
1058        map.register_context(ContextConfig {
1059            name: "analytics".to_string(),
1060            cores: None,
1061        });
1062        map.add_cross_context_emit("Alerts".to_string(), 0, "analytics".to_string());
1063        let emits = map.cross_context_emits();
1064        assert_eq!(
1065            emits.get(&("Alerts".to_string(), 0)),
1066            Some(&"analytics".to_string())
1067        );
1068    }
1069
1070    #[test]
1071    fn test_no_context_backward_compat() {
1072        let map = ContextMap::new();
1073        assert!(!map.has_contexts());
1074    }
1075
1076    #[test]
1077    fn test_context_config_no_cores() {
1078        let config = ContextConfig {
1079            name: "test".to_string(),
1080            cores: None,
1081        };
1082        assert_eq!(config.name, "test");
1083        assert!(config.cores.is_none());
1084    }
1085
1086    #[test]
1087    fn test_context_map_multiple_contexts() {
1088        let mut map = ContextMap::new();
1089        map.register_context(ContextConfig {
1090            name: "ingestion".to_string(),
1091            cores: Some(vec![0, 1]),
1092        });
1093        map.register_context(ContextConfig {
1094            name: "analytics".to_string(),
1095            cores: Some(vec![2, 3]),
1096        });
1097        map.register_context(ContextConfig {
1098            name: "alerts".to_string(),
1099            cores: Some(vec![4]),
1100        });
1101
1102        assert_eq!(map.contexts().len(), 3);
1103
1104        map.assign_stream("RawEvents".to_string(), "ingestion".to_string());
1105        map.assign_stream("Analysis".to_string(), "analytics".to_string());
1106        map.assign_stream("Notifications".to_string(), "alerts".to_string());
1107
1108        assert_eq!(map.stream_context("RawEvents"), Some("ingestion"));
1109        assert_eq!(map.stream_context("Analysis"), Some("analytics"));
1110        assert_eq!(map.stream_context("Notifications"), Some("alerts"));
1111    }
1112
1113    #[test]
1114    fn test_context_orchestrator_event_types_from_source() {
1115        let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1116            "SensorReading".to_string(),
1117        ));
1118        assert_eq!(types, vec!["SensorReading"]);
1119
1120        let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1121            "ProcessedEvents".to_string(),
1122        ));
1123        assert_eq!(types, vec!["ProcessedEvents"]);
1124    }
1125
1126    #[test]
1127    fn test_filter_program_for_context() {
1128        use varpulis_core::span::Spanned;
1129
1130        let program = Program {
1131            statements: vec![
1132                Spanned {
1133                    node: Stmt::ContextDecl {
1134                        name: "ctx1".to_string(),
1135                        cores: None,
1136                    },
1137                    span: varpulis_core::span::Span::dummy(),
1138                },
1139                Spanned {
1140                    node: Stmt::ContextDecl {
1141                        name: "ctx2".to_string(),
1142                        cores: None,
1143                    },
1144                    span: varpulis_core::span::Span::dummy(),
1145                },
1146                Spanned {
1147                    node: Stmt::StreamDecl {
1148                        name: "StreamA".to_string(),
1149                        type_annotation: None,
1150                        source: StreamSource::Ident("EventA".to_string()),
1151                        ops: vec![],
1152                        op_spans: vec![],
1153                    },
1154                    span: varpulis_core::span::Span::dummy(),
1155                },
1156                Spanned {
1157                    node: Stmt::StreamDecl {
1158                        name: "StreamB".to_string(),
1159                        type_annotation: None,
1160                        source: StreamSource::Ident("EventB".to_string()),
1161                        ops: vec![],
1162                        op_spans: vec![],
1163                    },
1164                    span: varpulis_core::span::Span::dummy(),
1165                },
1166            ],
1167        };
1168
1169        let mut context_map = ContextMap::new();
1170        context_map.register_context(ContextConfig {
1171            name: "ctx1".to_string(),
1172            cores: None,
1173        });
1174        context_map.register_context(ContextConfig {
1175            name: "ctx2".to_string(),
1176            cores: None,
1177        });
1178        context_map.assign_stream("StreamA".to_string(), "ctx1".to_string());
1179        context_map.assign_stream("StreamB".to_string(), "ctx2".to_string());
1180
1181        let filtered = filter_program_for_context(&program, "ctx1", &context_map);
1182
1183        let stream_count = filtered
1184            .statements
1185            .iter()
1186            .filter(|s| matches!(s.node, Stmt::StreamDecl { .. }))
1187            .count();
1188        assert_eq!(stream_count, 1, "ctx1 should have exactly 1 stream");
1189
1190        let has_stream_a = filtered
1191            .statements
1192            .iter()
1193            .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamA"));
1194        assert!(has_stream_a, "ctx1 should contain StreamA");
1195
1196        let has_stream_b = filtered
1197            .statements
1198            .iter()
1199            .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamB"));
1200        assert!(!has_stream_b, "ctx1 should NOT contain StreamB");
1201
1202        let context_decl_count = filtered
1203            .statements
1204            .iter()
1205            .filter(|s| matches!(s.node, Stmt::ContextDecl { .. }))
1206            .count();
1207        assert_eq!(
1208            context_decl_count, 2,
1209            "All ContextDecls should be preserved"
1210        );
1211    }
1212
1213    #[test]
1214    fn test_ingress_routing_includes_derived_types() {
1215        use varpulis_core::span::Spanned;
1216
1217        let program = Program {
1218            statements: vec![
1219                Spanned {
1220                    node: Stmt::ContextDecl {
1221                        name: "ingest".to_string(),
1222                        cores: None,
1223                    },
1224                    span: varpulis_core::span::Span::dummy(),
1225                },
1226                Spanned {
1227                    node: Stmt::ContextDecl {
1228                        name: "analytics".to_string(),
1229                        cores: None,
1230                    },
1231                    span: varpulis_core::span::Span::dummy(),
1232                },
1233                Spanned {
1234                    node: Stmt::StreamDecl {
1235                        name: "RawData".to_string(),
1236                        type_annotation: None,
1237                        source: StreamSource::Ident("SensorReading".to_string()),
1238                        ops: vec![],
1239                        op_spans: vec![],
1240                    },
1241                    span: varpulis_core::span::Span::dummy(),
1242                },
1243                Spanned {
1244                    node: Stmt::StreamDecl {
1245                        name: "Analysis".to_string(),
1246                        type_annotation: None,
1247                        source: StreamSource::Ident("RawData".to_string()),
1248                        ops: vec![],
1249                        op_spans: vec![],
1250                    },
1251                    span: varpulis_core::span::Span::dummy(),
1252                },
1253            ],
1254        };
1255
1256        let mut context_map = ContextMap::new();
1257        context_map.register_context(ContextConfig {
1258            name: "ingest".to_string(),
1259            cores: None,
1260        });
1261        context_map.register_context(ContextConfig {
1262            name: "analytics".to_string(),
1263            cores: None,
1264        });
1265        context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1266        context_map.assign_stream("Analysis".to_string(), "analytics".to_string());
1267
1268        let (output_tx, _output_rx) = mpsc::channel(10);
1269        let orchestrator =
1270            ContextOrchestrator::build(&context_map, &program, output_tx, 100).unwrap();
1271
1272        let routing = orchestrator.ingress_routing();
1273
1274        assert_eq!(routing.get("SensorReading"), Some(&"ingest".to_string()));
1275        assert_eq!(routing.get("RawData"), Some(&"analytics".to_string()));
1276
1277        orchestrator.shutdown();
1278    }
1279
1280    #[test]
1281    fn test_ingress_routing_cross_context_emits() {
1282        let mut context_map = ContextMap::new();
1283        context_map.register_context(ContextConfig {
1284            name: "ingest".to_string(),
1285            cores: None,
1286        });
1287        context_map.register_context(ContextConfig {
1288            name: "analytics".to_string(),
1289            cores: None,
1290        });
1291        context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1292        context_map.add_cross_context_emit("RawData".to_string(), 0, "analytics".to_string());
1293
1294        let emits = context_map.cross_context_emits();
1295        assert_eq!(
1296            emits.get(&("RawData".to_string(), 0)),
1297            Some(&"analytics".to_string())
1298        );
1299    }
1300
1301    #[test]
1302    #[cfg(target_os = "linux")]
1303    fn test_cpu_affinity_verification() {
1304        let cores = verify_cpu_affinity();
1305        assert!(cores.is_some(), "Should be able to read CPU affinity");
1306        let cores = cores.unwrap();
1307        assert!(!cores.is_empty(), "Should have at least one allowed core");
1308    }
1309}