Skip to main content

ringkernel_core/
runtime.rs

1//! Runtime traits and types for kernel management.
2//!
3//! This module defines the core runtime abstraction that backends implement
4//! to provide kernel lifecycle management, message passing, and monitoring.
5//!
6//! # Overview
7//!
8//! The runtime module provides the central abstractions for managing GPU kernels:
9//!
10//! - [`RingKernelRuntime`] - The main trait implemented by backends (CPU, CUDA)
11//! - [`KernelHandle`] - A handle for interacting with launched kernels
12//! - [`LaunchOptions`] - Configuration options for kernel launches
13//! - [`KernelState`] - Lifecycle states (Created → Launched → Active → Terminated)
14//!
15//! # Kernel Lifecycle
16//!
17//! ```text
18//! ┌─────────┐     ┌──────────┐     ┌────────┐     ┌────────────┐
19//! │ Created │ ──► │ Launched │ ──► │ Active │ ──► │ Terminated │
20//! └─────────┘     └──────────┘     └────────┘     └────────────┘
21//!                       │              ▲  │
22//!                       │              │  ▼
23//!                       │        ┌─────────────┐
24//!                       └──────► │ Deactivated │
25//!                                └─────────────┘
26//! ```
27//!
28//! # Example
29//!
30//! ```ignore
31//! use ringkernel_core::runtime::{RingKernelRuntime, LaunchOptions, KernelState};
32//! use ringkernel_cpu::CpuRuntime;
33//!
34//! #[tokio::main]
35//! async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
36//!     // Create a runtime
37//!     let runtime = CpuRuntime::new().await?;
38//!
39//!     // Launch a kernel with custom options
40//!     let options = LaunchOptions::single_block(256)
41//!         .with_queue_capacity(2048)
42//!         .with_k2k(true);  // Enable kernel-to-kernel messaging
43//!
44//!     let kernel = runtime.launch("my_processor", options).await?;
45//!
46//!     // Kernel auto-activates by default
47//!     assert!(kernel.is_active());
48//!
49//!     // Send messages to the kernel
50//!     kernel.send(MyMessage { value: 42 }).await?;
51//!
52//!     // Receive responses
53//!     let response = kernel.receive_timeout(Duration::from_secs(1)).await?;
54//!
55//!     // Terminate when done
56//!     kernel.terminate().await?;
57//!
58//!     Ok(())
59//! }
60//! ```
61//!
62//! # Backend Selection
63//!
64//! Use [`Backend::Auto`] to automatically select the best available backend,
65//! or specify a specific backend for testing/deployment:
66//!
67//! ```ignore
68//! use ringkernel_core::runtime::{RuntimeBuilder, Backend};
69//!
70//! // Auto-select: CUDA → CPU
71//! let builder = RuntimeBuilder::new().backend(Backend::Auto);
72//!
73//! // Force CPU for testing
74//! let builder = RuntimeBuilder::new().backend(Backend::Cpu);
75//!
76//! // Use CUDA with specific device
77//! let builder = RuntimeBuilder::new()
78//!     .backend(Backend::Cuda)
79//!     .device(1)  // Second GPU
80//!     .profiling(true);
81//! ```
82
83use std::future::Future;
84use std::pin::Pin;
85use std::sync::Arc;
86use std::time::Duration;
87
88use async_trait::async_trait;
89
90use crate::error::Result;
91use crate::message::{MessageEnvelope, RingMessage};
92use crate::scheduling::SchedulerConfig;
93use crate::telemetry::KernelMetrics;
94use crate::types::KernelMode;
95
96/// Unique kernel identifier.
97#[derive(Debug, Clone, PartialEq, Eq, Hash)]
98pub struct KernelId(pub String);
99
100impl KernelId {
101    /// Create a new kernel ID.
102    pub fn new(id: impl Into<String>) -> Self {
103        Self(id.into())
104    }
105
106    /// Get the ID as a string slice.
107    pub fn as_str(&self) -> &str {
108        &self.0
109    }
110}
111
112impl std::fmt::Display for KernelId {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{}", self.0)
115    }
116}
117
118impl From<&str> for KernelId {
119    fn from(s: &str) -> Self {
120        Self(s.to_string())
121    }
122}
123
124impl From<String> for KernelId {
125    fn from(s: String) -> Self {
126        Self(s)
127    }
128}
129
130/// Kernel lifecycle state.
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
132pub enum KernelState {
133    /// Kernel is created but not launched.
134    Created,
135    /// Kernel is launched and initializing.
136    Launched,
137    /// Kernel is active and processing messages.
138    Active,
139    /// Kernel is deactivated (paused).
140    Deactivated,
141    /// Kernel is terminating.
142    Terminating,
143    /// Kernel has terminated.
144    Terminated,
145}
146
147impl KernelState {
148    /// Check if kernel can be activated.
149    pub fn can_activate(&self) -> bool {
150        matches!(self, Self::Launched | Self::Deactivated)
151    }
152
153    /// Check if kernel can be deactivated.
154    pub fn can_deactivate(&self) -> bool {
155        matches!(self, Self::Active)
156    }
157
158    /// Check if kernel can be terminated.
159    pub fn can_terminate(&self) -> bool {
160        matches!(self, Self::Active | Self::Deactivated | Self::Launched)
161    }
162
163    /// Check if kernel is running (can process messages).
164    pub fn is_running(&self) -> bool {
165        matches!(self, Self::Active)
166    }
167
168    /// Check if kernel is finished.
169    pub fn is_finished(&self) -> bool {
170        matches!(self, Self::Terminated)
171    }
172}
173
174/// Kernel status including state and metrics.
175#[derive(Debug, Clone)]
176pub struct KernelStatus {
177    /// Kernel identifier.
178    pub id: KernelId,
179    /// Current state.
180    pub state: KernelState,
181    /// Execution mode.
182    pub mode: KernelMode,
183    /// Messages in input queue.
184    pub input_queue_depth: usize,
185    /// Messages in output queue.
186    pub output_queue_depth: usize,
187    /// Total messages processed.
188    pub messages_processed: u64,
189    /// Uptime since launch.
190    pub uptime: Duration,
191}
192
193/// GPU backend type.
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
195pub enum Backend {
196    /// CPU backend (for testing).
197    Cpu,
198    /// NVIDIA CUDA backend.
199    Cuda,
200    /// Reserved for future Apple Metal backend (not currently implemented).
201    #[doc(hidden)]
202    Metal,
203    /// Reserved for future WebGPU cross-platform backend (not currently implemented).
204    #[doc(hidden)]
205    Wgpu,
206    /// Automatically select best available backend.
207    #[default]
208    Auto,
209}
210
211impl Backend {
212    /// Get display name.
213    pub fn name(&self) -> &'static str {
214        match self {
215            Backend::Cpu => "CPU",
216            Backend::Cuda => "CUDA",
217            Backend::Metal => "Metal",
218            Backend::Wgpu => "WebGPU",
219            Backend::Auto => "Auto",
220        }
221    }
222}
223
224impl std::fmt::Display for Backend {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        write!(f, "{}", self.name())
227    }
228}
229
230/// Options for launching a kernel.
231#[derive(Debug, Clone)]
232pub struct LaunchOptions {
233    /// Execution mode (persistent or event-driven).
234    pub mode: KernelMode,
235    /// Grid size (number of blocks).
236    pub grid_size: u32,
237    /// Block size (threads per block).
238    pub block_size: u32,
239    /// Input queue capacity.
240    pub input_queue_capacity: usize,
241    /// Output queue capacity.
242    pub output_queue_capacity: usize,
243    /// Shared memory size in bytes.
244    pub shared_memory_size: usize,
245    /// Whether to activate immediately after launch.
246    pub auto_activate: bool,
247    /// Enable cooperative groups for grid-wide synchronization.
248    /// Requires GPU support for cooperative kernel launch.
249    pub cooperative: bool,
250    /// Enable K2K (kernel-to-kernel) messaging.
251    /// Allocates routing table and inbox buffers on GPU.
252    pub enable_k2k: bool,
253    /// Dynamic scheduling configuration for persistent actor load balancing.
254    ///
255    /// When set to a non-`Static` strategy, the codegen layer generates a
256    /// scheduler warp pattern: warp 0 in each block handles work distribution,
257    /// remaining warps perform computation.
258    ///
259    /// Default: `None` (static scheduling, current behavior).
260    pub scheduler_config: Option<SchedulerConfig>,
261}
262
263impl Default for LaunchOptions {
264    fn default() -> Self {
265        Self {
266            mode: KernelMode::Persistent,
267            grid_size: 1,
268            block_size: 256,
269            input_queue_capacity: 1024,
270            output_queue_capacity: 1024,
271            shared_memory_size: 0,
272            auto_activate: true,
273            cooperative: false,
274            enable_k2k: false,
275            scheduler_config: None,
276        }
277    }
278}
279
280impl LaunchOptions {
281    /// Create options for a single-block kernel.
282    pub fn single_block(block_size: u32) -> Self {
283        Self {
284            block_size,
285            ..Default::default()
286        }
287    }
288
289    /// Create options for a multi-block kernel.
290    pub fn multi_block(grid_size: u32, block_size: u32) -> Self {
291        Self {
292            grid_size,
293            block_size,
294            ..Default::default()
295        }
296    }
297
298    /// Set execution mode.
299    pub fn with_mode(mut self, mode: KernelMode) -> Self {
300        self.mode = mode;
301        self
302    }
303
304    /// Set queue capacities.
305    pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
306        self.input_queue_capacity = capacity;
307        self.output_queue_capacity = capacity;
308        self
309    }
310
311    /// Set shared memory size.
312    pub fn with_shared_memory(mut self, size: usize) -> Self {
313        self.shared_memory_size = size;
314        self
315    }
316
317    /// Disable auto-activation.
318    pub fn without_auto_activate(mut self) -> Self {
319        self.auto_activate = false;
320        self
321    }
322
323    /// Set the grid size (number of blocks).
324    pub fn with_grid_size(mut self, grid_size: u32) -> Self {
325        self.grid_size = grid_size;
326        self
327    }
328
329    /// Set the block size (threads per block).
330    pub fn with_block_size(mut self, block_size: u32) -> Self {
331        self.block_size = block_size;
332        self
333    }
334
335    /// Enable cooperative groups for grid-wide synchronization.
336    ///
337    /// When enabled, the kernel will be launched cooperatively, allowing
338    /// all blocks to synchronize via `grid.sync()`. Requires GPU support
339    /// and nvcc at build time.
340    pub fn with_cooperative(mut self, cooperative: bool) -> Self {
341        self.cooperative = cooperative;
342        self
343    }
344
345    /// Enable K2K (kernel-to-kernel) messaging.
346    ///
347    /// When enabled, allocates routing table and inbox buffers on GPU
348    /// for direct kernel-to-kernel communication without host intervention.
349    pub fn with_k2k(mut self, enable: bool) -> Self {
350        self.enable_k2k = enable;
351        self
352    }
353
354    /// Configure dynamic actor scheduling for load balancing.
355    ///
356    /// When set, the codegen layer generates a scheduler warp pattern within
357    /// each persistent kernel block. Warp 0 handles work distribution (stealing,
358    /// round-robin, or priority-based), while remaining warps process messages.
359    ///
360    /// # Example
361    ///
362    /// ```ignore
363    /// use ringkernel_core::scheduling::SchedulerConfig;
364    ///
365    /// let options = LaunchOptions::default()
366    ///     .with_scheduler(SchedulerConfig::work_stealing(8)
367    ///         .with_max_steal_batch(16));
368    /// ```
369    pub fn with_scheduler(mut self, config: SchedulerConfig) -> Self {
370        self.scheduler_config = Some(config);
371        self
372    }
373
374    /// Set priority hint for kernel scheduling.
375    ///
376    /// Note: This is a hint for future use - currently ignored by backends.
377    pub fn with_priority(self, _priority: u8) -> Self {
378        // Priority hint stored for future scheduling use
379        self
380    }
381
382    /// Set input queue capacity only.
383    pub fn with_input_queue_capacity(mut self, capacity: usize) -> Self {
384        self.input_queue_capacity = capacity;
385        self
386    }
387
388    /// Set output queue capacity only.
389    pub fn with_output_queue_capacity(mut self, capacity: usize) -> Self {
390        self.output_queue_capacity = capacity;
391        self
392    }
393}
394
395/// Type-erased future for async operations.
396pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
397
398/// Backend-agnostic runtime trait for kernel management.
399///
400/// This trait is implemented by each backend (CPU, CUDA)
401/// to provide kernel lifecycle management and message passing.
402#[async_trait]
403pub trait RingKernelRuntime: Send + Sync {
404    /// Get the backend type.
405    fn backend(&self) -> Backend;
406
407    /// Check if a specific backend is available.
408    fn is_backend_available(&self, backend: Backend) -> bool;
409
410    /// Launch a kernel.
411    async fn launch(&self, kernel_id: &str, options: LaunchOptions) -> Result<KernelHandle>;
412
413    /// Get a handle to an existing kernel.
414    fn get_kernel(&self, kernel_id: &KernelId) -> Option<KernelHandle>;
415
416    /// List all kernel IDs.
417    fn list_kernels(&self) -> Vec<KernelId>;
418
419    /// Get runtime metrics.
420    fn metrics(&self) -> RuntimeMetrics;
421
422    /// Get per-kernel metrics for a specific kernel.
423    ///
424    /// Returns detailed metrics for the kernel identified by `kernel_id`,
425    /// including message counts, queue depths, uptime, state, and whether
426    /// the kernel is GPU-launched. Returns `None` if the kernel is not found.
427    fn kernel_metrics(&self, kernel_id: &KernelId) -> Option<KernelMetrics> {
428        // Default: look up the kernel handle and delegate to its metrics()
429        self.get_kernel(kernel_id).map(|handle| handle.metrics())
430    }
431
432    /// Shutdown the runtime and terminate all kernels.
433    async fn shutdown(&self) -> Result<()>;
434}
435
436/// Handle to a launched kernel.
437///
438/// Provides an ergonomic API for interacting with a kernel.
439#[derive(Clone)]
440pub struct KernelHandle {
441    /// Kernel identifier.
442    id: KernelId,
443    /// Inner implementation.
444    inner: Arc<dyn KernelHandleInner>,
445}
446
447impl KernelHandle {
448    /// Create a new kernel handle.
449    pub fn new(id: KernelId, inner: Arc<dyn KernelHandleInner>) -> Self {
450        Self { id, inner }
451    }
452
453    /// Get the kernel ID.
454    pub fn id(&self) -> &KernelId {
455        &self.id
456    }
457
458    /// Activate the kernel.
459    pub async fn activate(&self) -> Result<()> {
460        self.inner.activate().await
461    }
462
463    /// Deactivate the kernel.
464    pub async fn deactivate(&self) -> Result<()> {
465        self.inner.deactivate().await
466    }
467
468    /// Terminate the kernel.
469    pub async fn terminate(&self) -> Result<()> {
470        self.inner.terminate().await
471    }
472
473    /// Send a message to the kernel.
474    pub async fn send<M: RingMessage>(&self, message: M) -> Result<()> {
475        let envelope = MessageEnvelope::new(
476            &message,
477            0, // Host source
478            self.inner.kernel_id_num(),
479            self.inner.current_timestamp(),
480        );
481        self.inner.send_envelope(envelope).await
482    }
483
484    /// Send a raw envelope.
485    pub async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()> {
486        self.inner.send_envelope(envelope).await
487    }
488
489    /// Receive a message from the kernel.
490    pub async fn receive(&self) -> Result<MessageEnvelope> {
491        self.inner.receive().await
492    }
493
494    /// Receive a message with timeout.
495    pub async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope> {
496        self.inner.receive_timeout(timeout).await
497    }
498
499    /// Try to receive a message (non-blocking).
500    pub fn try_receive(&self) -> Result<MessageEnvelope> {
501        self.inner.try_receive()
502    }
503
504    /// Send request and wait for response (call pattern).
505    pub async fn call<M: RingMessage>(
506        &self,
507        message: M,
508        timeout: Duration,
509    ) -> Result<MessageEnvelope> {
510        // Generate correlation ID
511        let correlation = crate::message::CorrelationId::generate();
512
513        // Create envelope with correlation
514        let mut envelope = MessageEnvelope::new(
515            &message,
516            0,
517            self.inner.kernel_id_num(),
518            self.inner.current_timestamp(),
519        );
520        envelope.header.correlation_id = correlation;
521
522        // Send and wait for correlated response
523        self.inner.send_envelope(envelope).await?;
524        self.inner.receive_correlated(correlation, timeout).await
525    }
526
527    /// Get kernel status.
528    pub fn status(&self) -> KernelStatus {
529        self.inner.status()
530    }
531
532    /// Get kernel metrics.
533    pub fn metrics(&self) -> KernelMetrics {
534        self.inner.metrics()
535    }
536
537    /// Wait for kernel to terminate.
538    pub async fn wait(&self) -> Result<()> {
539        self.inner.wait().await
540    }
541
542    /// Get the current kernel state.
543    ///
544    /// This is a convenience method that returns just the state from status().
545    pub fn state(&self) -> KernelState {
546        self.status().state
547    }
548
549    /// Suspend (deactivate) the kernel.
550    ///
551    /// This is an alias for `deactivate()` for more intuitive API usage.
552    pub async fn suspend(&self) -> Result<()> {
553        self.deactivate().await
554    }
555
556    /// Resume (activate) the kernel.
557    ///
558    /// This is an alias for `activate()` for more intuitive API usage.
559    pub async fn resume(&self) -> Result<()> {
560        self.activate().await
561    }
562
563    /// Check if the kernel is currently active.
564    pub fn is_active(&self) -> bool {
565        self.state() == KernelState::Active
566    }
567
568    /// Check if the kernel has terminated.
569    pub fn is_terminated(&self) -> bool {
570        self.state() == KernelState::Terminated
571    }
572}
573
574impl std::fmt::Debug for KernelHandle {
575    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
576        f.debug_struct("KernelHandle")
577            .field("id", &self.id)
578            .finish()
579    }
580}
581
582/// Inner trait for kernel handle implementation.
583///
584/// This is implemented by each backend to provide the actual functionality.
585#[async_trait]
586pub trait KernelHandleInner: Send + Sync {
587    /// Get numeric kernel ID.
588    fn kernel_id_num(&self) -> u64;
589
590    /// Get current timestamp.
591    fn current_timestamp(&self) -> crate::hlc::HlcTimestamp;
592
593    /// Activate kernel.
594    async fn activate(&self) -> Result<()>;
595
596    /// Deactivate kernel.
597    async fn deactivate(&self) -> Result<()>;
598
599    /// Terminate kernel.
600    async fn terminate(&self) -> Result<()>;
601
602    /// Send message envelope.
603    async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()>;
604
605    /// Receive message.
606    async fn receive(&self) -> Result<MessageEnvelope>;
607
608    /// Receive with timeout.
609    async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope>;
610
611    /// Try receive (non-blocking).
612    fn try_receive(&self) -> Result<MessageEnvelope>;
613
614    /// Receive correlated response.
615    async fn receive_correlated(
616        &self,
617        correlation: crate::message::CorrelationId,
618        timeout: Duration,
619    ) -> Result<MessageEnvelope>;
620
621    /// Get status.
622    fn status(&self) -> KernelStatus;
623
624    /// Get metrics.
625    fn metrics(&self) -> KernelMetrics;
626
627    /// Wait for termination.
628    async fn wait(&self) -> Result<()>;
629}
630
631/// Runtime-level metrics.
632#[derive(Debug, Clone, Default)]
633pub struct RuntimeMetrics {
634    /// Number of active kernels.
635    pub active_kernels: usize,
636    /// Total kernels launched.
637    pub total_launched: u64,
638    /// Total messages sent.
639    pub messages_sent: u64,
640    /// Total messages received.
641    pub messages_received: u64,
642    /// GPU memory used (bytes).
643    pub gpu_memory_used: u64,
644    /// Host memory used (bytes).
645    pub host_memory_used: u64,
646}
647
648/// Builder for creating a runtime instance.
649#[derive(Debug, Clone)]
650pub struct RuntimeBuilder {
651    /// Selected backend.
652    pub backend: Backend,
653    /// Device index (for multi-GPU).
654    pub device_index: usize,
655    /// Enable debug mode.
656    pub debug: bool,
657    /// Enable profiling.
658    pub profiling: bool,
659}
660
661impl Default for RuntimeBuilder {
662    fn default() -> Self {
663        Self {
664            backend: Backend::Auto,
665            device_index: 0,
666            debug: false,
667            profiling: false,
668        }
669    }
670}
671
672impl RuntimeBuilder {
673    /// Create a new builder.
674    pub fn new() -> Self {
675        Self::default()
676    }
677
678    /// Set the backend.
679    pub fn backend(mut self, backend: Backend) -> Self {
680        self.backend = backend;
681        self
682    }
683
684    /// Set device index.
685    pub fn device(mut self, index: usize) -> Self {
686        self.device_index = index;
687        self
688    }
689
690    /// Enable debug mode.
691    pub fn debug(mut self, enable: bool) -> Self {
692        self.debug = enable;
693        self
694    }
695
696    /// Enable profiling.
697    pub fn profiling(mut self, enable: bool) -> Self {
698        self.profiling = enable;
699        self
700    }
701}
702
703#[cfg(test)]
704mod tests {
705    use super::*;
706
707    #[test]
708    fn test_kernel_state_transitions() {
709        assert!(KernelState::Launched.can_activate());
710        assert!(KernelState::Deactivated.can_activate());
711        assert!(!KernelState::Active.can_activate());
712        assert!(!KernelState::Terminated.can_activate());
713
714        assert!(KernelState::Active.can_deactivate());
715        assert!(!KernelState::Launched.can_deactivate());
716
717        assert!(KernelState::Active.can_terminate());
718        assert!(KernelState::Deactivated.can_terminate());
719        assert!(!KernelState::Terminated.can_terminate());
720    }
721
722    #[test]
723    fn test_launch_options_builder() {
724        let opts = LaunchOptions::multi_block(4, 128)
725            .with_mode(KernelMode::EventDriven)
726            .with_queue_capacity(2048)
727            .with_shared_memory(4096)
728            .without_auto_activate();
729
730        assert_eq!(opts.grid_size, 4);
731        assert_eq!(opts.block_size, 128);
732        assert_eq!(opts.mode, KernelMode::EventDriven);
733        assert_eq!(opts.input_queue_capacity, 2048);
734        assert_eq!(opts.shared_memory_size, 4096);
735        assert!(!opts.auto_activate);
736    }
737
738    #[test]
739    fn test_kernel_id() {
740        let id1 = KernelId::new("test_kernel");
741        let id2: KernelId = "test_kernel".into();
742        assert_eq!(id1, id2);
743        assert_eq!(id1.as_str(), "test_kernel");
744    }
745
746    #[test]
747    fn test_backend_name() {
748        assert_eq!(Backend::Cpu.name(), "CPU");
749        assert_eq!(Backend::Cuda.name(), "CUDA");
750        assert_eq!(Backend::Metal.name(), "Metal");
751        assert_eq!(Backend::Wgpu.name(), "WebGPU");
752    }
753}