Skip to main content

laminar_core/tpc/
core_handle.rs

1//! # Core Handle
2//!
3//! Manages a single core's reactor thread with SPSC queue communication.
4//!
5//! ## Architecture
6//!
7//! Each `CoreHandle` spawns a dedicated thread that:
8//! 1. Sets CPU affinity to pin to a specific core
9//! 2. Creates a `Reactor` with its own state partition
10//! 3. Optionally initializes an `io_uring` ring for async I/O (Linux only)
11//! 4. Drains the inbox SPSC queue for incoming events
12//! 5. Processes events through the reactor
13//! 6. Pushes outputs to the outbox SPSC queue
14//!
15//! ## Communication
16//!
17//! - **Inbox**: Main thread → Core thread (events, watermarks, commands)
18//! - **Outbox**: Core thread → Main thread (outputs)
19//!
20//! Both use lock-free SPSC queues for minimal latency.
21//!
22//! ## `io_uring` Integration
23//!
24//! On Linux with the `io-uring` feature enabled, each core thread can have its own
25//! `io_uring` ring for high-performance async I/O. This enables:
26//! - SQPOLL mode for syscall-free submission
27//! - Registered buffers for zero-copy I/O
28//! - Per-core I/O isolation for thread-per-core architecture
29
30use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31use std::sync::Arc;
32use std::thread::{self, JoinHandle};
33
34#[cfg(all(target_os = "linux", feature = "io-uring"))]
35use crate::io_uring::{CoreRingManager, IoUringConfig};
36
37use crate::alloc::HotPathGuard;
38use crate::budget::TaskBudget;
39use crate::numa::{NumaAllocator, NumaTopology};
40use crate::operator::{Event, Operator, Output};
41use crate::reactor::{Reactor, ReactorConfig};
42
43use super::backpressure::{
44    BackpressureConfig, CreditAcquireResult, CreditGate, CreditMetrics, OverflowStrategy,
45};
46use super::spsc::SpscQueue;
47use super::TpcError;
48
49/// Messages sent to a core thread.
50#[derive(Debug)]
51pub enum CoreMessage {
52    /// Process an event
53    Event(Event),
54    /// Watermark advancement
55    Watermark(i64),
56    /// Request a checkpoint
57    CheckpointRequest(u64),
58    /// Graceful shutdown
59    Shutdown,
60}
61
62/// Configuration for a core handle.
63#[derive(Debug, Clone)]
64pub struct CoreConfig {
65    /// Core ID (0-indexed)
66    pub core_id: usize,
67    /// CPU ID to pin to (None = no pinning)
68    pub cpu_affinity: Option<usize>,
69    /// Inbox queue capacity
70    pub inbox_capacity: usize,
71    /// Outbox queue capacity
72    pub outbox_capacity: usize,
73    /// Reactor configuration
74    pub reactor_config: ReactorConfig,
75    /// Backpressure configuration
76    pub backpressure: BackpressureConfig,
77    /// Enable NUMA-aware memory allocation
78    pub numa_aware: bool,
79    /// `io_uring` configuration (Linux only, requires `io-uring` feature)
80    #[cfg(all(target_os = "linux", feature = "io-uring"))]
81    pub io_uring_config: Option<IoUringConfig>,
82}
83
84impl Default for CoreConfig {
85    fn default() -> Self {
86        Self {
87            core_id: 0,
88            cpu_affinity: None,
89            inbox_capacity: 65536,
90            outbox_capacity: 65536,
91            reactor_config: ReactorConfig::default(),
92            backpressure: BackpressureConfig::default(),
93            numa_aware: false,
94            #[cfg(all(target_os = "linux", feature = "io-uring"))]
95            io_uring_config: None,
96        }
97    }
98}
99
100/// Handle to a core's reactor thread.
101///
102/// Provides lock-free communication with the core via SPSC queues
103/// and credit-based flow control for backpressure.
104pub struct CoreHandle {
105    /// Core ID
106    core_id: usize,
107    /// NUMA node for this core
108    numa_node: usize,
109    /// Inbox queue (main thread writes, core reads)
110    inbox: Arc<SpscQueue<CoreMessage>>,
111    /// Outbox queue (core writes, main thread reads)
112    outbox: Arc<SpscQueue<Output>>,
113    /// Credit gate for backpressure (sender acquires, receiver releases)
114    credit_gate: Arc<CreditGate>,
115    /// Thread handle (None if thread hasn't started or has been joined)
116    thread: Option<JoinHandle<Result<(), TpcError>>>,
117    /// Shutdown flag
118    shutdown: Arc<AtomicBool>,
119    /// Events processed counter
120    events_processed: Arc<AtomicU64>,
121    /// Outputs dropped due to full outbox
122    outputs_dropped: Arc<AtomicU64>,
123    /// Running state
124    is_running: Arc<AtomicBool>,
125}
126
127impl CoreHandle {
128    /// Spawns a new core thread with the given configuration.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if the thread cannot be spawned.
133    pub fn spawn(config: CoreConfig) -> Result<Self, TpcError> {
134        Self::spawn_with_operators(config, Vec::new())
135    }
136
137    /// Spawns a new core thread with operators.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the thread cannot be spawned.
142    #[allow(clippy::needless_pass_by_value)]
143    pub fn spawn_with_operators(
144        config: CoreConfig,
145        operators: Vec<Box<dyn Operator>>,
146    ) -> Result<Self, TpcError> {
147        let core_id = config.core_id;
148        let cpu_affinity = config.cpu_affinity;
149        let reactor_config = config.reactor_config.clone();
150
151        // Detect NUMA topology for NUMA-aware allocation
152        let topology = NumaTopology::detect();
153        let numa_node =
154            cpu_affinity.map_or_else(|| topology.current_node(), |cpu| topology.node_for_cpu(cpu));
155
156        let inbox = Arc::new(SpscQueue::new(config.inbox_capacity));
157        let outbox = Arc::new(SpscQueue::new(config.outbox_capacity));
158        let credit_gate = Arc::new(CreditGate::new(config.backpressure.clone()));
159        let shutdown = Arc::new(AtomicBool::new(false));
160        let events_processed = Arc::new(AtomicU64::new(0));
161        let outputs_dropped = Arc::new(AtomicU64::new(0));
162        let is_running = Arc::new(AtomicBool::new(false));
163
164        let thread_context = CoreThreadContext {
165            core_id,
166            cpu_affinity,
167            reactor_config,
168            numa_aware: config.numa_aware,
169            numa_node,
170            inbox: Arc::clone(&inbox),
171            outbox: Arc::clone(&outbox),
172            credit_gate: Arc::clone(&credit_gate),
173            shutdown: Arc::clone(&shutdown),
174            events_processed: Arc::clone(&events_processed),
175            outputs_dropped: Arc::clone(&outputs_dropped),
176            is_running: Arc::clone(&is_running),
177            #[cfg(all(target_os = "linux", feature = "io-uring"))]
178            io_uring_config: config.io_uring_config,
179        };
180
181        let thread = thread::Builder::new()
182            .name(format!("laminar-core-{core_id}"))
183            .spawn(move || core_thread_main(&thread_context, operators))
184            .map_err(|e| TpcError::SpawnFailed {
185                core_id,
186                message: e.to_string(),
187            })?;
188
189        // Wait for thread to signal it's running
190        while !is_running.load(Ordering::Acquire) {
191            thread::yield_now();
192        }
193
194        Ok(Self {
195            core_id,
196            numa_node,
197            inbox,
198            outbox,
199            credit_gate,
200            thread: Some(thread),
201            shutdown,
202            events_processed,
203            outputs_dropped,
204            is_running,
205        })
206    }
207
208    /// Returns the core ID.
209    #[must_use]
210    pub fn core_id(&self) -> usize {
211        self.core_id
212    }
213
214    /// Returns the NUMA node for this core.
215    #[must_use]
216    pub fn numa_node(&self) -> usize {
217        self.numa_node
218    }
219
220    /// Returns true if the core thread is running.
221    #[must_use]
222    pub fn is_running(&self) -> bool {
223        self.is_running.load(Ordering::Acquire)
224    }
225
226    /// Returns the number of events processed by this core.
227    #[must_use]
228    pub fn events_processed(&self) -> u64 {
229        self.events_processed.load(Ordering::Relaxed)
230    }
231
232    /// Returns the number of outputs dropped due to a full outbox.
233    #[must_use]
234    pub fn outputs_dropped(&self) -> u64 {
235        self.outputs_dropped.load(Ordering::Relaxed)
236    }
237
238    /// Sends a message to the core with credit-based flow control.
239    ///
240    /// This method respects the backpressure configuration:
241    /// - `Block`: Spins until credits available, then sends
242    /// - `Drop`: Returns Ok but drops the message if no credits
243    /// - `Error`: Returns error if no credits available
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the inbox queue is full or credits exhausted (with Error strategy).
248    pub fn send(&self, message: CoreMessage) -> Result<(), TpcError> {
249        // Try to acquire a credit
250        match self.credit_gate.try_acquire() {
251            CreditAcquireResult::Acquired => {
252                // Have credit, try to push
253                self.inbox.push(message).map_err(|_| TpcError::QueueFull {
254                    core_id: self.core_id,
255                })
256            }
257            CreditAcquireResult::WouldBlock => {
258                // Check overflow strategy
259                if self.credit_gate.config().overflow_strategy == OverflowStrategy::Block {
260                    // Spin until we get a credit
261                    self.credit_gate.acquire_blocking(1);
262                    self.inbox.push(message).map_err(|_| TpcError::QueueFull {
263                        core_id: self.core_id,
264                    })
265                } else {
266                    // Error strategy - return error
267                    Err(TpcError::Backpressure {
268                        core_id: self.core_id,
269                    })
270                }
271            }
272            CreditAcquireResult::Dropped => {
273                // Drop strategy - silently drop, already recorded in metrics
274                Ok(())
275            }
276        }
277    }
278
279    /// Tries to send a message without blocking.
280    ///
281    /// Returns `Err` if no credits available or queue is full.
282    /// Does not block regardless of overflow strategy.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if credits exhausted or queue full.
287    pub fn try_send(&self, message: CoreMessage) -> Result<(), TpcError> {
288        match self.credit_gate.try_acquire() {
289            CreditAcquireResult::Acquired => {
290                self.inbox.push(message).map_err(|_| TpcError::QueueFull {
291                    core_id: self.core_id,
292                })
293            }
294            CreditAcquireResult::WouldBlock | CreditAcquireResult::Dropped => {
295                Err(TpcError::Backpressure {
296                    core_id: self.core_id,
297                })
298            }
299        }
300    }
301
302    /// Sends an event to the core with credit-based flow control.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the inbox queue is full or backpressure applies.
307    pub fn send_event(&self, event: Event) -> Result<(), TpcError> {
308        self.send(CoreMessage::Event(event))
309    }
310
311    /// Tries to send an event without blocking.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if credits exhausted or queue full.
316    pub fn try_send_event(&self, event: Event) -> Result<(), TpcError> {
317        self.try_send(CoreMessage::Event(event))
318    }
319
320    /// Polls the outbox for outputs.
321    ///
322    /// Returns up to `max_count` outputs.
323    ///
324    /// # Note
325    ///
326    /// This method allocates memory. For zero-allocation polling, use
327    /// [`poll_outputs_into`](Self::poll_outputs_into) or [`poll_each`](Self::poll_each) instead.
328    #[must_use]
329    pub fn poll_outputs(&self, max_count: usize) -> Vec<Output> {
330        self.outbox.pop_batch(max_count)
331    }
332
333    /// Polls the outbox for outputs into a pre-allocated buffer (zero-allocation).
334    ///
335    /// Outputs are appended to `buffer`. Returns the number of outputs added.
336    /// The buffer should have sufficient capacity to avoid reallocation.
337    ///
338    /// # Example
339    ///
340    /// ```rust,ignore
341    /// let mut buffer = Vec::with_capacity(1024);
342    ///
343    /// // First poll - fills buffer
344    /// let count1 = core_handle.poll_outputs_into(&mut buffer, 100);
345    ///
346    /// // Process outputs...
347    /// for output in &buffer[..count1] {
348    ///     process(output);
349    /// }
350    ///
351    /// // Clear and reuse buffer for next poll (no allocation)
352    /// buffer.clear();
353    /// let count2 = core_handle.poll_outputs_into(&mut buffer, 100);
354    /// ```
355    #[inline]
356    pub fn poll_outputs_into(&self, buffer: &mut Vec<Output>, max_count: usize) -> usize {
357        let start_len = buffer.len();
358
359        self.outbox.pop_each(max_count, |output| {
360            buffer.push(output);
361            true
362        });
363
364        buffer.len() - start_len
365    }
366
367    /// Polls the outbox with a callback for each output (zero-allocation).
368    ///
369    /// Processing stops when:
370    /// - `max_count` outputs have been processed
371    /// - The outbox becomes empty
372    /// - The callback returns `false`
373    ///
374    /// Returns the number of outputs processed.
375    ///
376    /// # Example
377    ///
378    /// ```rust,ignore
379    /// // Process outputs without any allocation
380    /// let count = core_handle.poll_each(100, |output| {
381    ///     match output {
382    ///         Output::Event(event) => handle_event(event),
383    ///         _ => {}
384    ///     }
385    ///     true // Continue processing
386    /// });
387    /// ```
388    #[inline]
389    pub fn poll_each<F>(&self, max_count: usize, f: F) -> usize
390    where
391        F: FnMut(Output) -> bool,
392    {
393        self.outbox.pop_each(max_count, f)
394    }
395
396    /// Polls a single output from the outbox.
397    #[must_use]
398    pub fn poll_output(&self) -> Option<Output> {
399        self.outbox.pop()
400    }
401
402    /// Returns the number of pending messages in the inbox.
403    #[must_use]
404    pub fn inbox_len(&self) -> usize {
405        self.inbox.len()
406    }
407
408    /// Returns the number of pending outputs in the outbox.
409    #[must_use]
410    pub fn outbox_len(&self) -> usize {
411        self.outbox.len()
412    }
413
414    /// Returns true if backpressure is currently active.
415    #[must_use]
416    pub fn is_backpressured(&self) -> bool {
417        self.credit_gate.is_backpressured()
418    }
419
420    /// Returns the number of available credits.
421    #[must_use]
422    pub fn available_credits(&self) -> usize {
423        self.credit_gate.available()
424    }
425
426    /// Returns the maximum credits configured.
427    #[must_use]
428    pub fn max_credits(&self) -> usize {
429        self.credit_gate.max_credits()
430    }
431
432    /// Returns the credit metrics.
433    #[must_use]
434    pub fn credit_metrics(&self) -> &CreditMetrics {
435        self.credit_gate.metrics()
436    }
437
438    /// Signals the core to shut down gracefully.
439    pub fn shutdown(&self) {
440        self.shutdown.store(true, Ordering::Release);
441        // Also send a shutdown message to wake up the thread
442        let _ = self.inbox.push(CoreMessage::Shutdown);
443    }
444
445    /// Waits for the core thread to finish.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the thread panicked or returned an error.
450    pub fn join(mut self) -> Result<(), TpcError> {
451        if let Some(handle) = self.thread.take() {
452            handle.join().map_err(|_| TpcError::SpawnFailed {
453                core_id: self.core_id,
454                message: "Thread panicked".to_string(),
455            })?
456        } else {
457            Ok(())
458        }
459    }
460
461    /// Sends a shutdown signal and waits for the thread to finish.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the thread cannot be joined cleanly.
466    pub fn shutdown_and_join(self) -> Result<(), TpcError> {
467        self.shutdown();
468        self.join()
469    }
470}
471
472impl Drop for CoreHandle {
473    fn drop(&mut self) {
474        // Signal shutdown if not already done
475        self.shutdown.store(true, Ordering::Release);
476        // Try to send shutdown message (may fail if queue is full, that's OK)
477        let _ = self.inbox.push(CoreMessage::Shutdown);
478
479        // Join the thread if we haven't already
480        if let Some(handle) = self.thread.take() {
481            let _ = handle.join();
482        }
483    }
484}
485
486impl std::fmt::Debug for CoreHandle {
487    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488        f.debug_struct("CoreHandle")
489            .field("core_id", &self.core_id)
490            .field("numa_node", &self.numa_node)
491            .field("is_running", &self.is_running())
492            .field("events_processed", &self.events_processed())
493            .field("outputs_dropped", &self.outputs_dropped())
494            .field("inbox_len", &self.inbox_len())
495            .field("outbox_len", &self.outbox_len())
496            .field("available_credits", &self.available_credits())
497            .field("is_backpressured", &self.is_backpressured())
498            .finish_non_exhaustive()
499    }
500}
501
502/// Context passed to the core thread.
503struct CoreThreadContext {
504    core_id: usize,
505    cpu_affinity: Option<usize>,
506    reactor_config: ReactorConfig,
507    numa_aware: bool,
508    numa_node: usize,
509    inbox: Arc<SpscQueue<CoreMessage>>,
510    outbox: Arc<SpscQueue<Output>>,
511    credit_gate: Arc<CreditGate>,
512    shutdown: Arc<AtomicBool>,
513    events_processed: Arc<AtomicU64>,
514    outputs_dropped: Arc<AtomicU64>,
515    is_running: Arc<AtomicBool>,
516    #[cfg(all(target_os = "linux", feature = "io-uring"))]
517    io_uring_config: Option<IoUringConfig>,
518}
519
520/// Initializes the core thread: sets CPU affinity, NUMA allocator, `io_uring`, and creates the reactor.
521fn init_core_thread(
522    ctx: &CoreThreadContext,
523    operators: Vec<Box<dyn Operator>>,
524) -> Result<Reactor, TpcError> {
525    // Set CPU affinity if requested
526    if let Some(cpu_id) = ctx.cpu_affinity {
527        set_cpu_affinity(ctx.core_id, cpu_id)?;
528    }
529
530    // Log NUMA information if NUMA-aware mode is enabled
531    if ctx.numa_aware {
532        tracing::info!(
533            "Core {} starting on NUMA node {}",
534            ctx.core_id,
535            ctx.numa_node
536        );
537    }
538
539    // Create NUMA allocator for this core (optional - for future state store integration)
540    if ctx.numa_aware {
541        let topology = NumaTopology::detect();
542        let _numa_allocator = NumaAllocator::new(&topology);
543    }
544
545    // Initialize io_uring ring manager if configured (Linux only)
546    #[cfg(all(target_os = "linux", feature = "io-uring"))]
547    let _ring_manager = if let Some(ref io_uring_config) = ctx.io_uring_config {
548        match CoreRingManager::new(ctx.core_id, io_uring_config) {
549            Ok(manager) => Some(manager),
550            Err(e) => {
551                eprintln!(
552                    "Core {}: Failed to initialize io_uring ring: {e}. Falling back to standard I/O.",
553                    ctx.core_id
554                );
555                None
556            }
557        }
558    } else {
559        None
560    };
561
562    // Create the reactor with configured settings
563    let mut reactor_config = ctx.reactor_config.clone();
564    reactor_config.cpu_affinity = ctx.cpu_affinity;
565
566    let mut reactor = Reactor::new(reactor_config).map_err(|e| TpcError::ReactorError {
567        core_id: ctx.core_id,
568        source: e,
569    })?;
570
571    // Add operators
572    for op in operators {
573        reactor.add_operator(op);
574    }
575
576    Ok(reactor)
577}
578
579/// Main function for the core thread.
580fn core_thread_main(
581    ctx: &CoreThreadContext,
582    operators: Vec<Box<dyn Operator>>,
583) -> Result<(), TpcError> {
584    let mut reactor = init_core_thread(ctx, operators)?;
585
586    // Signal that we're running
587    ctx.is_running.store(true, Ordering::Release);
588
589    // Main loop
590    loop {
591        // Check for shutdown
592        if ctx.shutdown.load(Ordering::Acquire) {
593            break;
594        }
595
596        // Hot path guard for inbox processing
597        let _guard = HotPathGuard::enter("CoreThread::process_inbox");
598
599        // Task budget tracking for batch processing
600        let batch_budget = TaskBudget::ring0_batch();
601
602        // Drain inbox and track messages processed for credit release
603        let mut had_work = false;
604        let mut messages_processed = 0usize;
605
606        while let Some(message) = ctx.inbox.pop() {
607            match message {
608                CoreMessage::Event(event) => {
609                    if let Err(e) = reactor.submit(event) {
610                        eprintln!("Core {}: Failed to submit event: {e}", ctx.core_id);
611                    }
612                    messages_processed += 1;
613                    had_work = true;
614                }
615                CoreMessage::Watermark(timestamp) => {
616                    // Advance the reactor's watermark so downstream operators
617                    // see the updated event-time progress on the next poll().
618                    reactor.advance_watermark(timestamp);
619                    messages_processed += 1;
620                    had_work = true;
621                }
622                CoreMessage::CheckpointRequest(checkpoint_id) => {
623                    // Snapshot all operator states and push to outbox
624                    // for Ring 1 to persist via WAL + RocksDB.
625                    let operator_states = reactor.trigger_checkpoint();
626                    let checkpoint_output = Output::CheckpointComplete {
627                        checkpoint_id,
628                        operator_states,
629                    };
630                    if ctx.outbox.push(checkpoint_output).is_err() {
631                        ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
632                    }
633                    messages_processed += 1;
634                    had_work = true;
635                }
636                CoreMessage::Shutdown => {
637                    // Release credits for any messages we've processed so far
638                    if messages_processed > 0 {
639                        ctx.credit_gate.release(messages_processed);
640                    }
641                    break;
642                }
643            }
644
645            // Check if batch budget is almost exceeded and break to process reactor
646            // This ensures Ring 0 latency guarantees by limiting batch processing time
647            if batch_budget.almost_exceeded() {
648                break;
649            }
650        }
651
652        // Release credits for processed messages
653        // This signals to senders that we have capacity for more
654        if messages_processed > 0 {
655            ctx.credit_gate.release(messages_processed);
656        }
657
658        // Process events in reactor
659        let outputs = reactor.poll();
660        ctx.events_processed
661            .fetch_add(outputs.len() as u64, Ordering::Relaxed);
662
663        // Push outputs to outbox
664        for output in outputs {
665            if ctx.outbox.push(output).is_err() {
666                ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
667            }
668            had_work = true;
669        }
670
671        // If no work was done, yield to avoid busy-waiting
672        if !had_work {
673            thread::yield_now();
674        }
675    }
676
677    // Drain any remaining events before shutdown
678    let outputs = reactor.poll();
679    for output in outputs {
680        let _ = ctx.outbox.push(output);
681    }
682
683    ctx.is_running.store(false, Ordering::Release);
684    Ok(())
685}
686
687/// Sets CPU affinity for the current thread.
688fn set_cpu_affinity(core_id: usize, cpu_id: usize) -> Result<(), TpcError> {
689    #[cfg(target_os = "linux")]
690    {
691        use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
692        use std::mem;
693
694        // SAFETY: We're calling libc functions with valid parameters.
695        // The cpu_set_t is properly initialized with CPU_ZERO.
696        // The process ID 0 refers to the current thread.
697        #[allow(unsafe_code)]
698        unsafe {
699            let mut set: cpu_set_t = mem::zeroed();
700            CPU_ZERO(&mut set);
701            CPU_SET(cpu_id, &mut set);
702
703            let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
704            if result != 0 {
705                return Err(TpcError::AffinityFailed {
706                    core_id,
707                    message: format!(
708                        "sched_setaffinity failed: {}",
709                        std::io::Error::last_os_error()
710                    ),
711                });
712            }
713        }
714    }
715
716    #[cfg(target_os = "windows")]
717    {
718        use winapi::shared::basetsd::DWORD_PTR;
719        use winapi::um::processthreadsapi::GetCurrentThread;
720        use winapi::um::winbase::SetThreadAffinityMask;
721
722        // SAFETY: We're calling Windows API functions with valid parameters.
723        // GetCurrentThread returns a pseudo-handle that doesn't need to be closed.
724        // The mask is a valid CPU mask for the specified core.
725        #[allow(unsafe_code)]
726        unsafe {
727            let mask: DWORD_PTR = 1 << cpu_id;
728            let result = SetThreadAffinityMask(GetCurrentThread(), mask);
729            if result == 0 {
730                return Err(TpcError::AffinityFailed {
731                    core_id,
732                    message: format!(
733                        "SetThreadAffinityMask failed: {}",
734                        std::io::Error::last_os_error()
735                    ),
736                });
737            }
738        }
739    }
740
741    #[cfg(not(any(target_os = "linux", target_os = "windows")))]
742    {
743        let _ = (core_id, cpu_id);
744        // No-op on other platforms
745    }
746
747    Ok(())
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753    use crate::operator::{OperatorState, OutputVec, Timer};
754    use arrow_array::{Int64Array, RecordBatch};
755    use std::sync::Arc;
756    use std::time::Duration;
757
758    // Simple passthrough operator for testing
759    struct PassthroughOperator;
760
761    impl Operator for PassthroughOperator {
762        fn process(
763            &mut self,
764            event: &Event,
765            _ctx: &mut crate::operator::OperatorContext,
766        ) -> OutputVec {
767            let mut output = OutputVec::new();
768            output.push(Output::Event(event.clone()));
769            output
770        }
771
772        fn on_timer(
773            &mut self,
774            _timer: Timer,
775            _ctx: &mut crate::operator::OperatorContext,
776        ) -> OutputVec {
777            OutputVec::new()
778        }
779
780        fn checkpoint(&self) -> OperatorState {
781            OperatorState {
782                operator_id: "passthrough".to_string(),
783                data: vec![],
784            }
785        }
786
787        fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
788            Ok(())
789        }
790    }
791
792    fn make_event(value: i64) -> Event {
793        let array = Arc::new(Int64Array::from(vec![value]));
794        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
795        Event::new(value, batch)
796    }
797
798    #[test]
799    fn test_core_handle_spawn() {
800        let config = CoreConfig {
801            core_id: 0,
802            cpu_affinity: None, // Don't pin in tests
803            inbox_capacity: 1024,
804            outbox_capacity: 1024,
805            reactor_config: ReactorConfig::default(),
806            backpressure: super::BackpressureConfig::default(),
807            numa_aware: false,
808            #[cfg(all(target_os = "linux", feature = "io-uring"))]
809            io_uring_config: None,
810        };
811
812        let handle = CoreHandle::spawn(config).unwrap();
813        assert!(handle.is_running());
814        assert_eq!(handle.core_id(), 0);
815
816        handle.shutdown_and_join().unwrap();
817    }
818
819    #[test]
820    fn test_core_handle_with_operator() {
821        let config = CoreConfig {
822            core_id: 0,
823            cpu_affinity: None,
824            inbox_capacity: 1024,
825            outbox_capacity: 1024,
826            reactor_config: ReactorConfig::default(),
827            backpressure: super::BackpressureConfig::default(),
828            numa_aware: false,
829            #[cfg(all(target_os = "linux", feature = "io-uring"))]
830            io_uring_config: None,
831        };
832
833        let handle =
834            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
835
836        // Send an event
837        let event = make_event(42);
838        handle.send_event(event).unwrap();
839
840        // Wait a bit for processing
841        thread::sleep(Duration::from_millis(50));
842
843        // Poll for outputs
844        let outputs = handle.poll_outputs(10);
845        assert!(!outputs.is_empty());
846
847        handle.shutdown_and_join().unwrap();
848    }
849
850    #[test]
851    fn test_core_handle_multiple_events() {
852        let config = CoreConfig {
853            core_id: 1,
854            cpu_affinity: None,
855            inbox_capacity: 1024,
856            outbox_capacity: 1024,
857            reactor_config: ReactorConfig::default(),
858            backpressure: super::BackpressureConfig::default(),
859            numa_aware: false,
860            #[cfg(all(target_os = "linux", feature = "io-uring"))]
861            io_uring_config: None,
862        };
863
864        let handle =
865            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
866
867        // Send multiple events
868        for i in 0..100 {
869            handle.send_event(make_event(i)).unwrap();
870        }
871
872        // Wait for processing
873        thread::sleep(Duration::from_millis(100));
874
875        // Poll all outputs
876        let mut total_outputs = 0;
877        loop {
878            let outputs = handle.poll_outputs(1000);
879            if outputs.is_empty() {
880                break;
881            }
882            total_outputs += outputs.len();
883        }
884
885        // Should have at least some outputs (may have watermarks too)
886        assert!(total_outputs >= 100);
887
888        handle.shutdown_and_join().unwrap();
889    }
890
891    #[test]
892    fn test_core_handle_shutdown() {
893        let config = CoreConfig::default();
894        let handle = CoreHandle::spawn(config).unwrap();
895
896        assert!(handle.is_running());
897
898        handle.shutdown();
899
900        // Wait for shutdown
901        thread::sleep(Duration::from_millis(100));
902
903        // Thread should have stopped
904        assert!(!handle.is_running());
905    }
906
907    #[test]
908    fn test_core_handle_debug() {
909        let config = CoreConfig {
910            core_id: 42,
911            ..Default::default()
912        };
913        let handle = CoreHandle::spawn(config).unwrap();
914
915        let debug_str = format!("{handle:?}");
916        assert!(debug_str.contains("CoreHandle"));
917        assert!(debug_str.contains("42"));
918
919        handle.shutdown_and_join().unwrap();
920    }
921
922    #[test]
923    fn test_core_config_default() {
924        let config = CoreConfig::default();
925        assert_eq!(config.core_id, 0);
926        assert!(config.cpu_affinity.is_none());
927        assert_eq!(config.inbox_capacity, 65536);
928        assert_eq!(config.outbox_capacity, 65536);
929        assert!(!config.numa_aware);
930    }
931
932    #[test]
933    fn test_core_handle_numa_node() {
934        let config = CoreConfig {
935            core_id: 0,
936            cpu_affinity: None,
937            numa_aware: true,
938            ..Default::default()
939        };
940
941        let handle = CoreHandle::spawn(config).unwrap();
942        // On any system, numa_node should be a valid value (0 on non-NUMA systems)
943        assert!(handle.numa_node() < 64);
944
945        handle.shutdown_and_join().unwrap();
946    }
947
948    #[test]
949    fn test_poll_outputs_into() {
950        let config = CoreConfig {
951            core_id: 0,
952            cpu_affinity: None,
953            inbox_capacity: 1024,
954            outbox_capacity: 1024,
955            reactor_config: ReactorConfig::default(),
956            backpressure: super::BackpressureConfig::default(),
957            numa_aware: false,
958            #[cfg(all(target_os = "linux", feature = "io-uring"))]
959            io_uring_config: None,
960        };
961
962        let handle =
963            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
964
965        // Send events
966        for i in 0..10 {
967            handle.send_event(make_event(i)).unwrap();
968        }
969
970        // Wait for processing
971        thread::sleep(Duration::from_millis(100));
972
973        // Poll into pre-allocated buffer
974        let mut buffer = Vec::with_capacity(100);
975        let count = handle.poll_outputs_into(&mut buffer, 100);
976
977        assert!(count > 0);
978        assert_eq!(buffer.len(), count);
979
980        // Reuse buffer - no new allocation
981        let cap_before = buffer.capacity();
982        buffer.clear();
983        let _ = handle.poll_outputs_into(&mut buffer, 100);
984        assert_eq!(buffer.capacity(), cap_before); // Capacity unchanged
985
986        handle.shutdown_and_join().unwrap();
987    }
988
989    #[test]
990    fn test_poll_each() {
991        let config = CoreConfig {
992            core_id: 0,
993            cpu_affinity: None,
994            inbox_capacity: 1024,
995            outbox_capacity: 1024,
996            reactor_config: ReactorConfig::default(),
997            backpressure: super::BackpressureConfig::default(),
998            numa_aware: false,
999            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1000            io_uring_config: None,
1001        };
1002
1003        let handle =
1004            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1005
1006        // Send events
1007        for i in 0..10 {
1008            handle.send_event(make_event(i)).unwrap();
1009        }
1010
1011        // Wait for processing
1012        thread::sleep(Duration::from_millis(100));
1013
1014        // Poll with callback
1015        let mut event_count = 0;
1016        let count = handle.poll_each(100, |output| {
1017            if matches!(output, Output::Event(_)) {
1018                event_count += 1;
1019            }
1020            true
1021        });
1022
1023        assert!(count > 0);
1024        assert!(event_count > 0);
1025
1026        handle.shutdown_and_join().unwrap();
1027    }
1028
1029    #[test]
1030    fn test_poll_each_early_stop() {
1031        let config = CoreConfig {
1032            core_id: 0,
1033            cpu_affinity: None,
1034            inbox_capacity: 1024,
1035            outbox_capacity: 1024,
1036            reactor_config: ReactorConfig::default(),
1037            backpressure: super::BackpressureConfig::default(),
1038            numa_aware: false,
1039            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1040            io_uring_config: None,
1041        };
1042
1043        let handle =
1044            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1045
1046        // Send events
1047        for i in 0..20 {
1048            handle.send_event(make_event(i)).unwrap();
1049        }
1050
1051        // Wait for processing
1052        thread::sleep(Duration::from_millis(100));
1053
1054        // Poll with early stop after 5 items
1055        let mut processed = 0;
1056        let count = handle.poll_each(100, |_| {
1057            processed += 1;
1058            processed < 5 // Stop after 5
1059        });
1060
1061        assert_eq!(count, 5);
1062        assert_eq!(processed, 5);
1063
1064        // There should be more items remaining
1065        let remaining = handle.outbox_len();
1066        assert!(remaining > 0);
1067
1068        handle.shutdown_and_join().unwrap();
1069    }
1070
1071    #[test]
1072    fn test_watermark_propagation() {
1073        let config = CoreConfig {
1074            core_id: 0,
1075            cpu_affinity: None,
1076            inbox_capacity: 1024,
1077            outbox_capacity: 1024,
1078            reactor_config: ReactorConfig::default(),
1079            backpressure: super::BackpressureConfig::default(),
1080            numa_aware: false,
1081            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1082            io_uring_config: None,
1083        };
1084
1085        let handle =
1086            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1087
1088        // Send a watermark advancement
1089        handle.send(CoreMessage::Watermark(5000)).unwrap();
1090
1091        // Wait for processing
1092        thread::sleep(Duration::from_millis(50));
1093
1094        // Poll outputs - should contain a watermark
1095        let outputs = handle.poll_outputs(100);
1096        let has_watermark = outputs.iter().any(|o| matches!(o, Output::Watermark(_)));
1097        assert!(
1098            has_watermark,
1099            "Expected watermark output after Watermark message"
1100        );
1101
1102        handle.shutdown_and_join().unwrap();
1103    }
1104
1105    #[test]
1106    fn test_checkpoint_triggering() {
1107        let config = CoreConfig {
1108            core_id: 0,
1109            cpu_affinity: None,
1110            inbox_capacity: 1024,
1111            outbox_capacity: 1024,
1112            reactor_config: ReactorConfig::default(),
1113            backpressure: super::BackpressureConfig::default(),
1114            numa_aware: false,
1115            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1116            io_uring_config: None,
1117        };
1118
1119        let handle =
1120            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1121
1122        // Send a checkpoint request
1123        handle.send(CoreMessage::CheckpointRequest(42)).unwrap();
1124
1125        // Wait for processing
1126        thread::sleep(Duration::from_millis(50));
1127
1128        // Poll outputs - should contain a CheckpointComplete
1129        let outputs = handle.poll_outputs(100);
1130        let checkpoint = outputs
1131            .iter()
1132            .find(|o| matches!(o, Output::CheckpointComplete { .. }));
1133        assert!(checkpoint.is_some(), "Expected CheckpointComplete output");
1134
1135        if let Some(Output::CheckpointComplete {
1136            checkpoint_id,
1137            operator_states,
1138        }) = checkpoint
1139        {
1140            assert_eq!(*checkpoint_id, 42);
1141            // One operator (PassthroughOperator)
1142            assert_eq!(operator_states.len(), 1);
1143            assert_eq!(operator_states[0].operator_id, "passthrough");
1144        }
1145
1146        handle.shutdown_and_join().unwrap();
1147    }
1148
1149    #[test]
1150    fn test_outputs_dropped_counter() {
1151        let config = CoreConfig {
1152            core_id: 0,
1153            cpu_affinity: None,
1154            inbox_capacity: 1024,
1155            outbox_capacity: 4, // Very small outbox to force drops
1156            reactor_config: ReactorConfig::default(),
1157            backpressure: super::BackpressureConfig::default(),
1158            numa_aware: false,
1159            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1160            io_uring_config: None,
1161        };
1162
1163        let handle =
1164            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1165
1166        // Send many events without polling - outbox should fill up
1167        for i in 0..100 {
1168            let _ = handle.send_event(make_event(i));
1169        }
1170
1171        // Wait for processing to fill and overflow the outbox
1172        thread::sleep(Duration::from_millis(200));
1173
1174        // Some outputs should have been dropped
1175        let dropped = handle.outputs_dropped();
1176        assert!(
1177            dropped > 0,
1178            "Expected some outputs to be dropped with outbox_capacity=4"
1179        );
1180
1181        handle.shutdown_and_join().unwrap();
1182    }
1183}