Skip to main content

varpulis_runtime/
context.rs

1//! Context-based multi-threaded execution architecture.
2//!
3//! **Status: Production-ready, opt-in.**  Contexts are activated by declaring
4//! `context` blocks in VPL.  When no contexts are declared, the engine runs in
5//! single-threaded mode with zero overhead.
6//!
7//! Named contexts provide isolated execution domains. Each context runs on its own
8//! OS thread with a single-threaded Tokio runtime, enabling true parallelism without
9//! locks within a context. Cross-context communication uses bounded `mpsc` channels.
10//! CPU affinity pinning is supported via `core_affinity`.
11//!
12//! Key types:
13//! - [`ContextRuntime`] — runs a single context on a dedicated OS thread
14//! - [`ContextOrchestrator`] — manages all contexts and routes events between them
15//! - [`CheckpointCoordinator`] — coordinates consistent snapshots across contexts
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use rustc_hash::FxHashMap;
22use tokio::sync::{mpsc, watch};
23use tracing::{error, info, warn};
24use varpulis_core::ast::{Program, Stmt, StreamSource};
25
26use crate::engine::Engine;
27use crate::event::{Event, SharedEvent};
28use crate::persistence::{CheckpointConfig, CheckpointManager, EngineCheckpoint, StoreError};
29
30/// Messages sent through context channels.
31///
32/// Wraps either a regular event or a checkpoint barrier for exactly-once semantics.
33#[derive(Debug, Clone)]
34pub enum ContextMessage {
35    /// A regular event to process
36    Event(SharedEvent),
37    /// A checkpoint barrier — triggers state snapshot
38    CheckpointBarrier(CheckpointBarrier),
39    /// Watermark update from an upstream context
40    WatermarkUpdate {
41        source_context: String,
42        watermark_ms: i64,
43    },
44}
45
46/// A checkpoint barrier flowing through the context DAG.
47#[derive(Debug, Clone)]
48pub struct CheckpointBarrier {
49    pub checkpoint_id: u64,
50    pub timestamp_ms: i64,
51}
52
53/// Acknowledgment from a context after completing a checkpoint.
54#[derive(Debug)]
55pub struct CheckpointAck {
56    pub context_name: String,
57    pub checkpoint_id: u64,
58    pub engine_checkpoint: EngineCheckpoint,
59}
60
61/// Tracks a pending coordinated checkpoint across all contexts.
62struct PendingCheckpoint {
63    checkpoint_id: u64,
64    timestamp_ms: i64,
65    acks: HashMap<String, EngineCheckpoint>,
66    started_at: Instant,
67}
68
69/// Coordinates checkpoints across multiple contexts.
70///
71/// Sends `CheckpointBarrier` to all contexts, collects `CheckpointAck` responses,
72/// and persists the assembled `Checkpoint` once all contexts have acknowledged.
73pub struct CheckpointCoordinator {
74    manager: CheckpointManager,
75    ack_tx: mpsc::Sender<CheckpointAck>,
76    ack_rx: mpsc::Receiver<CheckpointAck>,
77    context_names: Vec<String>,
78    pending: Option<PendingCheckpoint>,
79    next_checkpoint_id: u64,
80}
81
82impl std::fmt::Debug for CheckpointCoordinator {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        f.debug_struct("CheckpointCoordinator")
85            .field("context_names", &self.context_names)
86            .field("next_checkpoint_id", &self.next_checkpoint_id)
87            .field("has_pending", &self.pending.is_some())
88            .finish_non_exhaustive()
89    }
90}
91
92impl CheckpointCoordinator {
93    /// Create a new coordinator for the given contexts.
94    pub fn new(manager: CheckpointManager, context_names: Vec<String>) -> Self {
95        let (ack_tx, ack_rx) = mpsc::channel(context_names.len() * 2);
96        Self {
97            manager,
98            ack_tx,
99            ack_rx,
100            context_names,
101            pending: None,
102            next_checkpoint_id: 1,
103        }
104    }
105
106    /// Get a sender for checkpoint acknowledgments (cloned into each context).
107    pub fn ack_sender(&self) -> mpsc::Sender<CheckpointAck> {
108        self.ack_tx.clone()
109    }
110
111    /// Initiate a new checkpoint by sending barriers to all contexts.
112    pub fn initiate(&mut self, context_txs: &FxHashMap<String, mpsc::Sender<ContextMessage>>) {
113        if self.pending.is_some() {
114            warn!("Checkpoint already in progress, skipping initiation");
115            return;
116        }
117
118        let checkpoint_id = self.next_checkpoint_id;
119        self.next_checkpoint_id += 1;
120        let timestamp_ms = chrono::Utc::now().timestamp_millis();
121
122        let barrier = CheckpointBarrier {
123            checkpoint_id,
124            timestamp_ms,
125        };
126
127        for (ctx_name, tx) in context_txs {
128            if let Err(e) = tx.try_send(ContextMessage::CheckpointBarrier(barrier.clone())) {
129                error!(
130                    "Failed to send checkpoint barrier to context '{}': {}",
131                    ctx_name, e
132                );
133            }
134        }
135
136        self.pending = Some(PendingCheckpoint {
137            checkpoint_id,
138            timestamp_ms,
139            acks: HashMap::new(),
140            started_at: Instant::now(),
141        });
142
143        info!("Initiated checkpoint {}", checkpoint_id);
144    }
145
146    /// Receive an acknowledgment. Returns a complete `Checkpoint` when all contexts have acked.
147    pub fn receive_ack(&mut self, ack: CheckpointAck) -> Option<crate::persistence::Checkpoint> {
148        let pending = self.pending.as_mut()?;
149
150        if ack.checkpoint_id != pending.checkpoint_id {
151            warn!(
152                "Received ack for checkpoint {} but expecting {}",
153                ack.checkpoint_id, pending.checkpoint_id
154            );
155            return None;
156        }
157
158        pending.acks.insert(ack.context_name, ack.engine_checkpoint);
159
160        if pending.acks.len() == self.context_names.len() {
161            let pending = self.pending.take().unwrap();
162            let mut context_states = HashMap::new();
163            for (name, cp) in pending.acks {
164                context_states.insert(name, cp);
165            }
166
167            Some(crate::persistence::Checkpoint {
168                id: pending.checkpoint_id,
169                timestamp_ms: pending.timestamp_ms,
170                events_processed: 0, // Filled from context states
171                window_states: HashMap::new(),
172                pattern_states: HashMap::new(),
173                metadata: HashMap::new(),
174                context_states,
175            })
176        } else {
177            None
178        }
179    }
180
181    /// Try to drain pending acks and complete the checkpoint.
182    pub fn try_complete(&mut self) -> Result<(), StoreError> {
183        while let Ok(ack) = self.ack_rx.try_recv() {
184            if let Some(checkpoint) = self.receive_ack(ack) {
185                self.manager.checkpoint(checkpoint)?;
186                return Ok(());
187            }
188        }
189
190        // Warn if a checkpoint has been pending for too long (> 30s)
191        if let Some(ref pending) = self.pending {
192            if pending.started_at.elapsed() > std::time::Duration::from_secs(30) {
193                warn!(
194                    "Checkpoint {} has been pending for {:.1}s — contexts may be blocked",
195                    pending.checkpoint_id,
196                    pending.started_at.elapsed().as_secs_f64()
197                );
198            }
199        }
200
201        Ok(())
202    }
203
204    /// Check if a checkpoint should be initiated based on interval.
205    pub fn should_checkpoint(&self) -> bool {
206        self.pending.is_none() && self.manager.should_checkpoint()
207    }
208
209    /// Whether a checkpoint is currently in progress (waiting for acks).
210    pub const fn has_pending(&self) -> bool {
211        self.pending.is_some()
212    }
213}
214
215/// Configuration for a named context
216#[derive(Debug, Clone)]
217pub struct ContextConfig {
218    pub name: String,
219    pub cores: Option<Vec<usize>>,
220}
221
222/// Maps streams/connectors to their assigned context.
223///
224/// Built during `Engine::load()` by processing `ContextDecl` statements
225/// and `StreamOp::Context` / `Emit { target_context }` operations.
226#[derive(Debug, Clone, Default)]
227pub struct ContextMap {
228    /// context_name -> config
229    contexts: HashMap<String, ContextConfig>,
230    /// stream_name -> context_name
231    stream_assignments: HashMap<String, String>,
232    /// (stream_name, emit_index) -> target_context for cross-context emits
233    cross_context_emits: HashMap<(String, usize), String>,
234}
235
236impl ContextMap {
237    pub fn new() -> Self {
238        Self::default()
239    }
240
241    /// Register a context declaration
242    pub fn register_context(&mut self, config: ContextConfig) {
243        self.contexts.insert(config.name.clone(), config);
244    }
245
246    /// Assign a stream to a context
247    pub fn assign_stream(&mut self, stream_name: String, context_name: String) {
248        self.stream_assignments.insert(stream_name, context_name);
249    }
250
251    /// Record a cross-context emit
252    pub fn add_cross_context_emit(
253        &mut self,
254        stream_name: String,
255        emit_index: usize,
256        target_context: String,
257    ) {
258        self.cross_context_emits
259            .insert((stream_name, emit_index), target_context);
260    }
261
262    /// Check if any contexts have been declared
263    pub fn has_contexts(&self) -> bool {
264        !self.contexts.is_empty()
265    }
266
267    /// Get all declared contexts
268    pub const fn contexts(&self) -> &HashMap<String, ContextConfig> {
269        &self.contexts
270    }
271
272    /// Get the context assignment for a stream
273    pub fn stream_context(&self, stream_name: &str) -> Option<&str> {
274        self.stream_assignments.get(stream_name).map(|s| s.as_str())
275    }
276
277    /// Get all stream assignments
278    pub const fn stream_assignments(&self) -> &HashMap<String, String> {
279        &self.stream_assignments
280    }
281
282    /// Get all cross-context emits
283    pub const fn cross_context_emits(&self) -> &HashMap<(String, usize), String> {
284        &self.cross_context_emits
285    }
286}
287
288/// Filter a program to keep only the streams assigned to a specific context.
289///
290/// Retains all non-stream statements (ContextDecl, ConnectorDecl, VarDecl,
291/// Assignment, FnDecl, EventDecl, PatternDecl, Config) since they may be
292/// needed by any context. Only `StreamDecl` statements are filtered based
293/// on context assignment.
294pub fn filter_program_for_context(
295    program: &Program,
296    context_name: &str,
297    context_map: &ContextMap,
298) -> Program {
299    let filtered_statements = program
300        .statements
301        .iter()
302        .filter(|stmt| {
303            if let Stmt::StreamDecl { name, .. } = &stmt.node {
304                // Keep the stream only if it's assigned to this context
305                match context_map.stream_context(name) {
306                    Some(ctx) => ctx == context_name,
307                    // Unassigned streams are kept in all contexts for backward compat
308                    None => true,
309                }
310            } else {
311                // Keep all non-stream statements
312                true
313            }
314        })
315        .cloned()
316        .collect();
317
318    Program {
319        statements: filtered_statements,
320    }
321}
322
323/// Verify the CPU affinity of the current thread by reading /proc/self/status.
324///
325/// Returns the list of CPU cores the current thread is allowed to run on,
326/// or `None` if the information cannot be read.
327#[cfg(target_os = "linux")]
328pub fn verify_cpu_affinity() -> Option<Vec<usize>> {
329    use std::fs;
330
331    let status = fs::read_to_string("/proc/self/status").ok()?;
332    for line in status.lines() {
333        if line.starts_with("Cpus_allowed_list:") {
334            let list_str = line.split(':').nth(1)?.trim();
335            let mut cores = Vec::new();
336            for part in list_str.split(',') {
337                let part = part.trim();
338                if let Some((start, end)) = part.split_once('-') {
339                    if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
340                        cores.extend(s..=e);
341                    }
342                } else if let Ok(core) = part.parse::<usize>() {
343                    cores.push(core);
344                }
345            }
346            return Some(cores);
347        }
348    }
349    None
350}
351
352/// A self-contained single-threaded runtime for one context.
353///
354/// Owns its streams, processes events without locks. Receives events from
355/// its inbound channel and forwards cross-context events via outbound channels.
356pub struct ContextRuntime {
357    name: String,
358    engine: Engine,
359    /// Main output channel (tenant/CLI)
360    output_tx: mpsc::Sender<Event>,
361    /// Inbound messages from orchestrator (events + barriers)
362    event_rx: mpsc::Receiver<ContextMessage>,
363    /// Engine's emitted events receiver
364    engine_output_rx: mpsc::Receiver<Event>,
365    /// Senders to all contexts (including self, for intra-context derived streams)
366    all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
367    /// event_type → context_name routing table
368    ingress_routing: FxHashMap<String, String>,
369    /// Shutdown signal receiver
370    shutdown_rx: watch::Receiver<bool>,
371    /// Checkpoint ack sender (if coordinated checkpointing is enabled)
372    ack_tx: Option<mpsc::Sender<CheckpointAck>>,
373    events_processed: u64,
374    output_events_emitted: u64,
375}
376
377impl std::fmt::Debug for ContextRuntime {
378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379        f.debug_struct("ContextRuntime")
380            .field("name", &self.name)
381            .field("ingress_routing", &self.ingress_routing)
382            .field("events_processed", &self.events_processed)
383            .field("output_events_emitted", &self.output_events_emitted)
384            .finish_non_exhaustive()
385    }
386}
387
388impl ContextRuntime {
389    /// Create a new context runtime
390    #[allow(clippy::too_many_arguments)]
391    pub const fn new(
392        name: String,
393        engine: Engine,
394        output_tx: mpsc::Sender<Event>,
395        event_rx: mpsc::Receiver<ContextMessage>,
396        engine_output_rx: mpsc::Receiver<Event>,
397        all_context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
398        ingress_routing: FxHashMap<String, String>,
399        shutdown_rx: watch::Receiver<bool>,
400    ) -> Self {
401        Self {
402            name,
403            engine,
404            output_tx,
405            event_rx,
406            engine_output_rx,
407            all_context_txs,
408            ingress_routing,
409            shutdown_rx,
410            ack_tx: None,
411            events_processed: 0,
412            output_events_emitted: 0,
413        }
414    }
415
416    /// Set the checkpoint acknowledgment sender for coordinated checkpointing.
417    pub fn with_ack_sender(mut self, ack_tx: mpsc::Sender<CheckpointAck>) -> Self {
418        self.ack_tx = Some(ack_tx);
419        self
420    }
421
422    /// Drain engine output events and route them to consuming contexts
423    /// and the main output channel.
424    fn drain_and_route_output(&mut self) {
425        while let Ok(output_event) = self.engine_output_rx.try_recv() {
426            self.output_events_emitted += 1;
427
428            // Route to consuming context if any
429            if let Some(target_ctx) = self.ingress_routing.get(&*output_event.event_type) {
430                if let Some(tx) = self.all_context_txs.get(target_ctx) {
431                    let shared = Arc::new(output_event);
432                    let _ = tx.try_send(ContextMessage::Event(Arc::clone(&shared)));
433                    // Unwrap the Arc for the output channel (we hold the only other ref)
434                    let owned = Arc::try_unwrap(shared).unwrap_or_else(|arc| (*arc).clone());
435                    let _ = self.output_tx.try_send(owned);
436                    continue;
437                }
438            }
439
440            // Always forward to main output channel
441            let _ = self.output_tx.try_send(output_event);
442        }
443    }
444
445    /// Handle a checkpoint barrier by snapshotting engine state and sending ack.
446    async fn handle_checkpoint_barrier(&self, barrier: CheckpointBarrier) {
447        if let Some(ref ack_tx) = self.ack_tx {
448            let checkpoint = self.engine.create_checkpoint();
449            let _ = ack_tx
450                .send(CheckpointAck {
451                    context_name: self.name.clone(),
452                    checkpoint_id: barrier.checkpoint_id,
453                    engine_checkpoint: checkpoint,
454                })
455                .await;
456        }
457    }
458
459    /// Run the event loop. Blocks the current thread.
460    ///
461    /// Receives events from the inbound channel, processes them through
462    /// the engine, and forwards cross-context events as needed.
463    ///
464    /// If the engine has session windows, a periodic sweep timer runs
465    /// every `gap` duration to close stale sessions. This ensures sessions
466    /// are emitted even when no new events arrive.
467    pub async fn run(&mut self) {
468        info!("Context '{}' runtime started", self.name);
469
470        #[cfg(target_os = "linux")]
471        if let Some(cores) = verify_cpu_affinity() {
472            info!("Context '{}' running on cores {:?}", self.name, cores);
473        }
474
475        // Compute sweep interval from engine's session window gaps
476        let has_sessions = self.engine.has_session_windows();
477        let sweep_interval = self
478            .engine
479            .min_session_gap()
480            .and_then(|d| d.to_std().ok())
481            .unwrap_or(std::time::Duration::from_secs(60));
482
483        let mut sweep_timer = tokio::time::interval(sweep_interval);
484        sweep_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
485        // Skip the first immediate tick
486        sweep_timer.tick().await;
487
488        loop {
489            tokio::select! {
490                biased;
491
492                _ = self.shutdown_rx.changed() => {
493                    if *self.shutdown_rx.borrow() {
494                        info!("Context '{}' received shutdown signal", self.name);
495                        // On shutdown: flush all remaining sessions
496                        if has_sessions {
497                            if let Err(e) = self.engine.flush_expired_sessions().await {
498                                error!("Context '{}' shutdown session flush error: {}", self.name, e);
499                            }
500                            self.drain_and_route_output();
501                        }
502                        break;
503                    }
504                }
505
506                _ = sweep_timer.tick(), if has_sessions => {
507                    match self.engine.flush_expired_sessions().await {
508                        Ok(()) => {}
509                        Err(e) => {
510                            error!("Context '{}' session sweep error: {}", self.name, e);
511                        }
512                    }
513                    self.drain_and_route_output();
514                }
515
516                msg = self.event_rx.recv() => {
517                    match msg {
518                        Some(ContextMessage::Event(event)) => {
519                            self.events_processed += 1;
520
521                            // Process the event through the engine (zero-copy via SharedEvent)
522                            match self.engine.process_shared(Arc::clone(&event)).await {
523                                Ok(()) => {}
524                                Err(e) => {
525                                    error!("Context '{}' processing error: {}", self.name, e);
526                                }
527                            }
528
529                            self.drain_and_route_output();
530                        }
531                        Some(ContextMessage::CheckpointBarrier(barrier)) => {
532                            self.handle_checkpoint_barrier(barrier).await;
533                        }
534                        Some(ContextMessage::WatermarkUpdate { source_context, watermark_ms }) => {
535                            // Feed watermark into engine's tracker (Phase 2E)
536                            let _ = self.engine.advance_external_watermark(&source_context, watermark_ms).await;
537                        }
538                        None => {
539                            // Channel closed
540                            break;
541                        }
542                    }
543                }
544            }
545        }
546
547        // Drop cross-context senders so other contexts can shut down too
548        self.all_context_txs.clear();
549
550        info!(
551            "Context '{}' runtime stopped (processed {} events, emitted {} output events)",
552            self.name, self.events_processed, self.output_events_emitted
553        );
554    }
555}
556
557/// Direct event-type-to-channel router for non-blocking dispatch.
558///
559/// Maps `event_type → Sender<ContextMessage>` directly (single HashMap lookup),
560/// uses `try_send()` for non-blocking dispatch, and is cheaply cloneable
561/// via `Arc<HashMap>` for multi-producer scenarios.
562#[derive(Clone)]
563pub struct EventTypeRouter {
564    routes: Arc<FxHashMap<String, mpsc::Sender<ContextMessage>>>,
565    default_tx: mpsc::Sender<ContextMessage>,
566}
567
568impl std::fmt::Debug for EventTypeRouter {
569    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570        f.debug_struct("EventTypeRouter")
571            .field("route_count", &self.routes.len())
572            .field("route_keys", &self.routes.keys().collect::<Vec<_>>())
573            .finish_non_exhaustive()
574    }
575}
576
577/// Errors returned by non-blocking dispatch methods.
578#[derive(Debug)]
579pub enum DispatchError {
580    /// Channel is full — caller should retry or use async dispatch
581    ChannelFull(ContextMessage),
582    /// Channel is closed — context has shut down
583    ChannelClosed(ContextMessage),
584}
585
586impl EventTypeRouter {
587    /// Non-blocking dispatch via `try_send()`.
588    ///
589    /// Routes the event to the correct context channel based on event type.
590    /// Returns immediately without waiting for channel capacity.
591    pub fn dispatch(&self, event: SharedEvent) -> Result<(), DispatchError> {
592        let tx = self
593            .routes
594            .get(&*event.event_type)
595            .unwrap_or(&self.default_tx);
596        let msg = ContextMessage::Event(event);
597        match tx.try_send(msg) {
598            Ok(()) => Ok(()),
599            Err(mpsc::error::TrySendError::Full(msg)) => Err(DispatchError::ChannelFull(msg)),
600            Err(mpsc::error::TrySendError::Closed(msg)) => Err(DispatchError::ChannelClosed(msg)),
601        }
602    }
603
604    /// Blocking dispatch via `send().await`.
605    ///
606    /// Waits for channel capacity if the channel is full.
607    pub async fn dispatch_await(&self, event: SharedEvent) -> Result<(), String> {
608        let event_type = event.event_type.clone();
609        let tx = self.routes.get(&*event_type).unwrap_or(&self.default_tx);
610        tx.send(ContextMessage::Event(event))
611            .await
612            .map_err(|e| format!("Failed to send event type '{event_type}': {e}"))
613    }
614
615    /// Batch dispatch — non-blocking, returns errors for any events that could not be sent.
616    pub fn dispatch_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
617        let mut errors = Vec::new();
618        for event in events {
619            if let Err(e) = self.dispatch(event) {
620                errors.push(e);
621            }
622        }
623        errors
624    }
625}
626
627/// Orchestrates multiple ContextRuntimes across OS threads.
628///
629/// Routes incoming events to the correct context based on event type
630/// and stream assignments.
631pub struct ContextOrchestrator {
632    /// Senders to each context's event channel
633    context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>>,
634    /// Thread handles for each context
635    handles: Vec<std::thread::JoinHandle<()>>,
636    /// event_type -> context_name routing table
637    ingress_routing: FxHashMap<String, String>,
638    /// Shutdown signal sender
639    shutdown_tx: watch::Sender<bool>,
640    /// Direct event-type-to-channel router
641    router: EventTypeRouter,
642    /// Optional checkpoint coordinator for exactly-once semantics
643    checkpoint_coordinator: Option<CheckpointCoordinator>,
644}
645
646impl std::fmt::Debug for ContextOrchestrator {
647    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648        f.debug_struct("ContextOrchestrator")
649            .field("context_count", &self.context_txs.len())
650            .field("ingress_routing", &self.ingress_routing)
651            .field("handle_count", &self.handles.len())
652            .finish_non_exhaustive()
653    }
654}
655
656impl ContextOrchestrator {
657    /// Build the orchestrator from engine state.
658    ///
659    /// For each declared context:
660    /// 1. Creates a bounded mpsc channel
661    /// 2. Creates an Engine with only the streams assigned to that context
662    /// 3. Spawns an OS thread with optional CPU affinity
663    /// 4. Inside the thread: creates a single-threaded Tokio runtime
664    ///    and runs the ContextRuntime event loop
665    pub fn build(
666        context_map: &ContextMap,
667        program: &Program,
668        output_tx: mpsc::Sender<Event>,
669        channel_capacity: usize,
670    ) -> Result<Self, String> {
671        Self::build_with_checkpoint(
672            context_map,
673            program,
674            output_tx,
675            channel_capacity,
676            None,
677            None,
678        )
679    }
680
681    /// Build the orchestrator with optional checkpoint configuration and recovery state.
682    pub fn build_with_checkpoint(
683        context_map: &ContextMap,
684        program: &Program,
685        output_tx: mpsc::Sender<Event>,
686        channel_capacity: usize,
687        checkpoint_config: Option<(CheckpointConfig, Arc<dyn crate::persistence::StateStore>)>,
688        recovery_checkpoint: Option<&crate::persistence::Checkpoint>,
689    ) -> Result<Self, String> {
690        let mut context_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = FxHashMap::default();
691        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
692
693        // Create shutdown signal
694        let (shutdown_tx, _shutdown_rx) = watch::channel(false);
695
696        // Determine default context
697        let default_context = context_map
698            .contexts()
699            .keys()
700            .next()
701            .cloned()
702            .unwrap_or_else(|| "default".to_string());
703
704        // Create cross-context senders: first pass to create all channels
705        let mut context_rxs: FxHashMap<String, mpsc::Receiver<ContextMessage>> =
706            FxHashMap::default();
707        for ctx_name in context_map.contexts().keys() {
708            let (tx, rx) = mpsc::channel(channel_capacity);
709            context_txs.insert(ctx_name.clone(), tx);
710            context_rxs.insert(ctx_name.clone(), rx);
711        }
712
713        // Set up checkpoint coordinator if configured
714        let context_names: Vec<String> = context_map.contexts().keys().cloned().collect();
715        let checkpoint_coordinator = checkpoint_config.map(|(config, store)| {
716            let manager = CheckpointManager::new(store, config)
717                .map_err(|e| format!("Failed to create checkpoint manager: {e}"))
718                .unwrap();
719            CheckpointCoordinator::new(manager, context_names.clone())
720        });
721        let ack_tx = checkpoint_coordinator.as_ref().map(|c| c.ack_sender());
722
723        // Build ingress routing: event_type -> context_name
724        let mut ingress_routing: FxHashMap<String, String> = FxHashMap::default();
725
726        // First pass: route raw event types from stream sources to contexts
727        for stmt in &program.statements {
728            if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
729                if let Some(ctx_name) = context_map.stream_context(name) {
730                    let event_types = Self::event_types_from_source(source);
731                    for et in event_types {
732                        ingress_routing.insert(et, ctx_name.to_string());
733                    }
734                }
735            }
736        }
737
738        // Second pass: route derived stream output types to consuming contexts
739        for stmt in &program.statements {
740            if let Stmt::StreamDecl { name, source, .. } = &stmt.node {
741                if let Some(ctx_name) = context_map.stream_context(name) {
742                    if let StreamSource::Ident(source_stream) = source {
743                        if context_map.stream_context(source_stream).is_some() {
744                            ingress_routing.insert(source_stream.clone(), ctx_name.to_string());
745                        }
746                    }
747                }
748            }
749        }
750
751        // Third pass: validate cross-context emit targets
752        for ((_stream_name, _emit_idx), target_ctx) in context_map.cross_context_emits() {
753            if !context_txs.contains_key(target_ctx) {
754                warn!(
755                    "Cross-context emit targets unknown context '{}'",
756                    target_ctx
757                );
758            }
759        }
760
761        // Build EventTypeRouter: event_type → Sender directly (single lookup)
762        let mut event_type_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> =
763            FxHashMap::default();
764        for (event_type, ctx_name) in &ingress_routing {
765            if let Some(tx) = context_txs.get(ctx_name) {
766                event_type_txs.insert(event_type.clone(), tx.clone());
767            }
768        }
769        let default_tx = context_txs
770            .get(&default_context)
771            .cloned()
772            .ok_or_else(|| format!("No channel for default context '{default_context}'"))?;
773        let router = EventTypeRouter {
774            routes: Arc::new(event_type_txs),
775            default_tx,
776        };
777
778        // Clone context_map for use inside thread spawning
779        let context_map_clone = context_map.clone();
780
781        // Clone recovery state per context
782        let recovery_states: HashMap<String, EngineCheckpoint> = recovery_checkpoint
783            .map(|cp| cp.context_states.clone())
784            .unwrap_or_default();
785
786        // Spawn a thread for each context
787        for (ctx_name, config) in context_map.contexts() {
788            let rx = context_rxs
789                .remove(ctx_name)
790                .ok_or_else(|| format!("No receiver for context {ctx_name}"))?;
791
792            let ctx_output_tx = output_tx.clone();
793            let ctx_name_clone = ctx_name.clone();
794            let cores = config.cores.clone();
795
796            // Clone all context senders for cross-context forwarding
797            let all_txs: FxHashMap<String, mpsc::Sender<ContextMessage>> = context_txs
798                .iter()
799                .map(|(k, v)| (k.clone(), v.clone()))
800                .collect();
801
802            // Filter the program to only include this context's streams
803            let filtered_program =
804                filter_program_for_context(program, ctx_name, &context_map_clone);
805            let ingress_routing_clone = ingress_routing.clone();
806            let shutdown_rx = shutdown_tx.subscribe();
807            let ctx_ack_tx = ack_tx.clone();
808            let ctx_recovery = recovery_states.get(ctx_name).cloned();
809
810            let handle = std::thread::Builder::new()
811                .name(format!("varpulis-ctx-{ctx_name}"))
812                .spawn(move || {
813                    // Set CPU affinity if specified
814                    if let Some(ref core_ids) = cores {
815                        Self::set_cpu_affinity(&ctx_name_clone, core_ids);
816                    }
817
818                    // Create a single-threaded Tokio runtime for this context
819                    let rt = tokio::runtime::Builder::new_current_thread()
820                        .enable_all()
821                        .build()
822                        .expect("Failed to create Tokio runtime for context");
823
824                    rt.block_on(async move {
825                        // Create engine for this context with filtered program
826                        let (engine_output_tx, engine_output_rx) = mpsc::channel(1000);
827                        let mut engine = Engine::new(engine_output_tx);
828                        engine.set_context_name(&ctx_name_clone);
829                        if let Err(e) = engine.load(&filtered_program) {
830                            error!(
831                                "Failed to load program for context '{}': {}",
832                                ctx_name_clone, e
833                            );
834                            return;
835                        }
836
837                        // Connect sinks (MQTT, Kafka, etc.) after load
838                        if let Err(e) = engine.connect_sinks().await {
839                            error!(
840                                "Failed to connect sinks for context '{}': {}",
841                                ctx_name_clone, e
842                            );
843                            return;
844                        }
845
846                        // Restore from checkpoint if available
847                        if let Some(cp) = ctx_recovery {
848                            if let Err(e) = engine.restore_checkpoint(&cp) {
849                                tracing::error!(
850                                    "Context {} failed to restore checkpoint: {}",
851                                    ctx_name_clone,
852                                    e
853                                );
854                                return;
855                            }
856                        }
857
858                        let mut ctx_runtime = ContextRuntime::new(
859                            ctx_name_clone,
860                            engine,
861                            ctx_output_tx,
862                            rx,
863                            engine_output_rx,
864                            all_txs,
865                            ingress_routing_clone,
866                            shutdown_rx,
867                        );
868
869                        if let Some(ack_tx) = ctx_ack_tx {
870                            ctx_runtime = ctx_runtime.with_ack_sender(ack_tx);
871                        }
872
873                        ctx_runtime.run().await;
874                    });
875                })
876                .map_err(|e| format!("Failed to spawn context thread: {e}"))?;
877
878            handles.push(handle);
879        }
880
881        Ok(Self {
882            context_txs,
883            handles,
884            ingress_routing,
885            shutdown_tx,
886            router,
887            checkpoint_coordinator,
888        })
889    }
890
891    /// Route an incoming event to the correct context (async, waits on backpressure).
892    pub async fn process(&self, event: SharedEvent) -> Result<(), String> {
893        self.router.dispatch_await(event).await
894    }
895
896    /// Non-blocking dispatch — returns `ChannelFull` instead of waiting.
897    pub fn try_process(&self, event: SharedEvent) -> Result<(), DispatchError> {
898        self.router.dispatch(event)
899    }
900
901    /// Batch dispatch — non-blocking, returns errors for events that could not be sent.
902    pub fn process_batch(&self, events: Vec<SharedEvent>) -> Vec<DispatchError> {
903        self.router.dispatch_batch(events)
904    }
905
906    /// Get a cloneable router handle for direct multi-producer dispatch.
907    pub fn router(&self) -> EventTypeRouter {
908        self.router.clone()
909    }
910
911    /// Shut down all context threads.
912    ///
913    /// Sends shutdown signal, drops senders, and waits for threads to finish.
914    pub fn shutdown(self) {
915        // Signal all contexts to shut down
916        let _ = self.shutdown_tx.send(true);
917
918        // Drop all senders to unblock any recv() calls
919        drop(self.context_txs);
920
921        // Wait for all threads to finish
922        for handle in self.handles {
923            if let Err(e) = handle.join() {
924                error!("Context thread panicked: {:?}", e);
925            }
926        }
927
928        info!("All context runtimes shut down");
929    }
930
931    /// Trigger a checkpoint across all contexts.
932    ///
933    /// Sends a `CheckpointBarrier` to every context. Each context will snapshot
934    /// its engine state and send a `CheckpointAck` back. Call `try_complete_checkpoint()`
935    /// afterwards to drain acks and persist.
936    pub fn trigger_checkpoint(&mut self) {
937        if let Some(ref mut coordinator) = self.checkpoint_coordinator {
938            coordinator.initiate(&self.context_txs);
939        }
940    }
941
942    /// Try to complete a pending checkpoint by draining acknowledgments.
943    ///
944    /// Returns `true` if a checkpoint was fully completed and persisted.
945    pub fn try_complete_checkpoint(&mut self) -> Result<bool, StoreError> {
946        if let Some(ref mut coordinator) = self.checkpoint_coordinator {
947            let had_pending = coordinator.has_pending();
948            coordinator.try_complete()?;
949            // If we had a pending checkpoint and now it's gone, we completed it
950            Ok(had_pending && !coordinator.has_pending())
951        } else {
952            Ok(false)
953        }
954    }
955
956    /// Check if a periodic checkpoint should be triggered (based on configured interval).
957    pub fn should_checkpoint(&self) -> bool {
958        self.checkpoint_coordinator
959            .as_ref()
960            .is_some_and(|c| c.should_checkpoint())
961    }
962
963    /// Run one checkpoint cycle: trigger if due, then try to complete.
964    ///
965    /// Call this periodically from the main event loop (e.g., every second or on a timer).
966    pub fn checkpoint_tick(&mut self) -> Result<bool, StoreError> {
967        if self.should_checkpoint() {
968            self.trigger_checkpoint();
969        }
970        self.try_complete_checkpoint()
971    }
972
973    /// Get the names of all running contexts
974    pub fn context_names(&self) -> Vec<&str> {
975        self.context_txs.keys().map(|s| s.as_str()).collect()
976    }
977
978    /// Get the ingress routing table (for testing/debugging)
979    pub const fn ingress_routing(&self) -> &FxHashMap<String, String> {
980        &self.ingress_routing
981    }
982
983    /// Extract event types consumed by a stream source
984    fn event_types_from_source(source: &StreamSource) -> Vec<String> {
985        match source {
986            StreamSource::Ident(name) => vec![name.clone()],
987            StreamSource::IdentWithAlias { name, .. } => vec![name.clone()],
988            StreamSource::AllWithAlias { name, .. } => vec![name.clone()],
989            StreamSource::FromConnector { event_type, .. } => vec![event_type.clone()],
990            StreamSource::Merge(decls) => decls.iter().map(|d| d.source.clone()).collect(),
991            StreamSource::Join(clauses) => clauses.iter().map(|c| c.source.clone()).collect(),
992            StreamSource::Sequence(decl) => {
993                decl.steps.iter().map(|s| s.event_type.clone()).collect()
994            }
995            StreamSource::Timer(_) => vec![],
996        }
997    }
998
999    /// Set CPU affinity for the current thread
1000    fn set_cpu_affinity(ctx_name: &str, core_ids: &[usize]) {
1001        #[cfg(target_os = "linux")]
1002        {
1003            use core_affinity::CoreId;
1004            if let Some(&first_core) = core_ids.first() {
1005                let core_id = CoreId { id: first_core };
1006                if core_affinity::set_for_current(core_id) {
1007                    info!("Context '{}' pinned to core {}", ctx_name, first_core);
1008                } else {
1009                    warn!(
1010                        "Failed to pin context '{}' to core {}",
1011                        ctx_name, first_core
1012                    );
1013                }
1014            }
1015        }
1016
1017        #[cfg(not(target_os = "linux"))]
1018        {
1019            tracing::debug!(
1020                "CPU affinity not supported on this platform for context '{}' (cores: {:?})",
1021                ctx_name,
1022                core_ids
1023            );
1024        }
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031
1032    #[test]
1033    fn test_context_map_new() {
1034        let map = ContextMap::new();
1035        assert!(!map.has_contexts());
1036        assert!(map.contexts().is_empty());
1037    }
1038
1039    #[test]
1040    fn test_context_map_register() {
1041        let mut map = ContextMap::new();
1042        map.register_context(ContextConfig {
1043            name: "ingestion".to_string(),
1044            cores: Some(vec![0, 1]),
1045        });
1046        assert!(map.has_contexts());
1047        assert_eq!(map.contexts().len(), 1);
1048        let config = map.contexts().get("ingestion").unwrap();
1049        assert_eq!(config.cores, Some(vec![0, 1]));
1050    }
1051
1052    #[test]
1053    fn test_context_map_stream_assignment() {
1054        let mut map = ContextMap::new();
1055        map.register_context(ContextConfig {
1056            name: "fast".to_string(),
1057            cores: None,
1058        });
1059        map.assign_stream("RawEvents".to_string(), "fast".to_string());
1060        assert_eq!(map.stream_context("RawEvents"), Some("fast"));
1061        assert_eq!(map.stream_context("Unknown"), None);
1062    }
1063
1064    #[test]
1065    fn test_context_map_cross_context_emit() {
1066        let mut map = ContextMap::new();
1067        map.register_context(ContextConfig {
1068            name: "analytics".to_string(),
1069            cores: None,
1070        });
1071        map.add_cross_context_emit("Alerts".to_string(), 0, "analytics".to_string());
1072        let emits = map.cross_context_emits();
1073        assert_eq!(
1074            emits.get(&("Alerts".to_string(), 0)),
1075            Some(&"analytics".to_string())
1076        );
1077    }
1078
1079    #[test]
1080    fn test_no_context_backward_compat() {
1081        let map = ContextMap::new();
1082        assert!(!map.has_contexts());
1083    }
1084
1085    #[test]
1086    fn test_context_config_no_cores() {
1087        let config = ContextConfig {
1088            name: "test".to_string(),
1089            cores: None,
1090        };
1091        assert_eq!(config.name, "test");
1092        assert!(config.cores.is_none());
1093    }
1094
1095    #[test]
1096    fn test_context_map_multiple_contexts() {
1097        let mut map = ContextMap::new();
1098        map.register_context(ContextConfig {
1099            name: "ingestion".to_string(),
1100            cores: Some(vec![0, 1]),
1101        });
1102        map.register_context(ContextConfig {
1103            name: "analytics".to_string(),
1104            cores: Some(vec![2, 3]),
1105        });
1106        map.register_context(ContextConfig {
1107            name: "alerts".to_string(),
1108            cores: Some(vec![4]),
1109        });
1110
1111        assert_eq!(map.contexts().len(), 3);
1112
1113        map.assign_stream("RawEvents".to_string(), "ingestion".to_string());
1114        map.assign_stream("Analysis".to_string(), "analytics".to_string());
1115        map.assign_stream("Notifications".to_string(), "alerts".to_string());
1116
1117        assert_eq!(map.stream_context("RawEvents"), Some("ingestion"));
1118        assert_eq!(map.stream_context("Analysis"), Some("analytics"));
1119        assert_eq!(map.stream_context("Notifications"), Some("alerts"));
1120    }
1121
1122    #[test]
1123    fn test_context_orchestrator_event_types_from_source() {
1124        let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1125            "SensorReading".to_string(),
1126        ));
1127        assert_eq!(types, vec!["SensorReading"]);
1128
1129        let types = ContextOrchestrator::event_types_from_source(&StreamSource::Ident(
1130            "ProcessedEvents".to_string(),
1131        ));
1132        assert_eq!(types, vec!["ProcessedEvents"]);
1133    }
1134
1135    #[test]
1136    fn test_filter_program_for_context() {
1137        use varpulis_core::span::Spanned;
1138
1139        let program = Program {
1140            statements: vec![
1141                Spanned {
1142                    node: Stmt::ContextDecl {
1143                        name: "ctx1".to_string(),
1144                        cores: None,
1145                    },
1146                    span: varpulis_core::span::Span::dummy(),
1147                },
1148                Spanned {
1149                    node: Stmt::ContextDecl {
1150                        name: "ctx2".to_string(),
1151                        cores: None,
1152                    },
1153                    span: varpulis_core::span::Span::dummy(),
1154                },
1155                Spanned {
1156                    node: Stmt::StreamDecl {
1157                        name: "StreamA".to_string(),
1158                        type_annotation: None,
1159                        source: StreamSource::Ident("EventA".to_string()),
1160                        ops: vec![],
1161                        op_spans: vec![],
1162                    },
1163                    span: varpulis_core::span::Span::dummy(),
1164                },
1165                Spanned {
1166                    node: Stmt::StreamDecl {
1167                        name: "StreamB".to_string(),
1168                        type_annotation: None,
1169                        source: StreamSource::Ident("EventB".to_string()),
1170                        ops: vec![],
1171                        op_spans: vec![],
1172                    },
1173                    span: varpulis_core::span::Span::dummy(),
1174                },
1175            ],
1176        };
1177
1178        let mut context_map = ContextMap::new();
1179        context_map.register_context(ContextConfig {
1180            name: "ctx1".to_string(),
1181            cores: None,
1182        });
1183        context_map.register_context(ContextConfig {
1184            name: "ctx2".to_string(),
1185            cores: None,
1186        });
1187        context_map.assign_stream("StreamA".to_string(), "ctx1".to_string());
1188        context_map.assign_stream("StreamB".to_string(), "ctx2".to_string());
1189
1190        let filtered = filter_program_for_context(&program, "ctx1", &context_map);
1191
1192        let stream_count = filtered
1193            .statements
1194            .iter()
1195            .filter(|s| matches!(s.node, Stmt::StreamDecl { .. }))
1196            .count();
1197        assert_eq!(stream_count, 1, "ctx1 should have exactly 1 stream");
1198
1199        let has_stream_a = filtered
1200            .statements
1201            .iter()
1202            .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamA"));
1203        assert!(has_stream_a, "ctx1 should contain StreamA");
1204
1205        let has_stream_b = filtered
1206            .statements
1207            .iter()
1208            .any(|s| matches!(&s.node, Stmt::StreamDecl { name, .. } if name == "StreamB"));
1209        assert!(!has_stream_b, "ctx1 should NOT contain StreamB");
1210
1211        let context_decl_count = filtered
1212            .statements
1213            .iter()
1214            .filter(|s| matches!(s.node, Stmt::ContextDecl { .. }))
1215            .count();
1216        assert_eq!(
1217            context_decl_count, 2,
1218            "All ContextDecls should be preserved"
1219        );
1220    }
1221
1222    #[test]
1223    fn test_ingress_routing_includes_derived_types() {
1224        use varpulis_core::span::Spanned;
1225
1226        let program = Program {
1227            statements: vec![
1228                Spanned {
1229                    node: Stmt::ContextDecl {
1230                        name: "ingest".to_string(),
1231                        cores: None,
1232                    },
1233                    span: varpulis_core::span::Span::dummy(),
1234                },
1235                Spanned {
1236                    node: Stmt::ContextDecl {
1237                        name: "analytics".to_string(),
1238                        cores: None,
1239                    },
1240                    span: varpulis_core::span::Span::dummy(),
1241                },
1242                Spanned {
1243                    node: Stmt::StreamDecl {
1244                        name: "RawData".to_string(),
1245                        type_annotation: None,
1246                        source: StreamSource::Ident("SensorReading".to_string()),
1247                        ops: vec![],
1248                        op_spans: vec![],
1249                    },
1250                    span: varpulis_core::span::Span::dummy(),
1251                },
1252                Spanned {
1253                    node: Stmt::StreamDecl {
1254                        name: "Analysis".to_string(),
1255                        type_annotation: None,
1256                        source: StreamSource::Ident("RawData".to_string()),
1257                        ops: vec![],
1258                        op_spans: vec![],
1259                    },
1260                    span: varpulis_core::span::Span::dummy(),
1261                },
1262            ],
1263        };
1264
1265        let mut context_map = ContextMap::new();
1266        context_map.register_context(ContextConfig {
1267            name: "ingest".to_string(),
1268            cores: None,
1269        });
1270        context_map.register_context(ContextConfig {
1271            name: "analytics".to_string(),
1272            cores: None,
1273        });
1274        context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1275        context_map.assign_stream("Analysis".to_string(), "analytics".to_string());
1276
1277        let (output_tx, _output_rx) = mpsc::channel(10);
1278        let orchestrator =
1279            ContextOrchestrator::build(&context_map, &program, output_tx, 100).unwrap();
1280
1281        let routing = orchestrator.ingress_routing();
1282
1283        assert_eq!(routing.get("SensorReading"), Some(&"ingest".to_string()));
1284        assert_eq!(routing.get("RawData"), Some(&"analytics".to_string()));
1285
1286        orchestrator.shutdown();
1287    }
1288
1289    #[test]
1290    fn test_ingress_routing_cross_context_emits() {
1291        let mut context_map = ContextMap::new();
1292        context_map.register_context(ContextConfig {
1293            name: "ingest".to_string(),
1294            cores: None,
1295        });
1296        context_map.register_context(ContextConfig {
1297            name: "analytics".to_string(),
1298            cores: None,
1299        });
1300        context_map.assign_stream("RawData".to_string(), "ingest".to_string());
1301        context_map.add_cross_context_emit("RawData".to_string(), 0, "analytics".to_string());
1302
1303        let emits = context_map.cross_context_emits();
1304        assert_eq!(
1305            emits.get(&("RawData".to_string(), 0)),
1306            Some(&"analytics".to_string())
1307        );
1308    }
1309
1310    #[test]
1311    #[cfg(target_os = "linux")]
1312    fn test_cpu_affinity_verification() {
1313        let cores = verify_cpu_affinity();
1314        assert!(cores.is_some(), "Should be able to read CPU affinity");
1315        let cores = cores.unwrap();
1316        assert!(!cores.is_empty(), "Should have at least one allowed core");
1317    }
1318}