1use std::future::Future;
84use std::pin::Pin;
85use std::sync::Arc;
86use std::time::Duration;
87
88use async_trait::async_trait;
89
90use crate::error::Result;
91use crate::message::{MessageEnvelope, RingMessage};
92use crate::telemetry::KernelMetrics;
93use crate::types::KernelMode;
94
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub struct KernelId(pub String);
98
99impl KernelId {
100 pub fn new(id: impl Into<String>) -> Self {
102 Self(id.into())
103 }
104
105 pub fn as_str(&self) -> &str {
107 &self.0
108 }
109}
110
111impl std::fmt::Display for KernelId {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{}", self.0)
114 }
115}
116
117impl From<&str> for KernelId {
118 fn from(s: &str) -> Self {
119 Self(s.to_string())
120 }
121}
122
123impl From<String> for KernelId {
124 fn from(s: String) -> Self {
125 Self(s)
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
131pub enum KernelState {
132 Created,
134 Launched,
136 Active,
138 Deactivated,
140 Terminating,
142 Terminated,
144}
145
146impl KernelState {
147 pub fn can_activate(&self) -> bool {
149 matches!(self, Self::Launched | Self::Deactivated)
150 }
151
152 pub fn can_deactivate(&self) -> bool {
154 matches!(self, Self::Active)
155 }
156
157 pub fn can_terminate(&self) -> bool {
159 matches!(self, Self::Active | Self::Deactivated | Self::Launched)
160 }
161
162 pub fn is_running(&self) -> bool {
164 matches!(self, Self::Active)
165 }
166
167 pub fn is_finished(&self) -> bool {
169 matches!(self, Self::Terminated)
170 }
171}
172
173#[derive(Debug, Clone)]
175pub struct KernelStatus {
176 pub id: KernelId,
178 pub state: KernelState,
180 pub mode: KernelMode,
182 pub input_queue_depth: usize,
184 pub output_queue_depth: usize,
186 pub messages_processed: u64,
188 pub uptime: Duration,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
194pub enum Backend {
195 Cpu,
197 Cuda,
199 Metal,
201 Wgpu,
203 #[default]
205 Auto,
206}
207
208impl Backend {
209 pub fn name(&self) -> &'static str {
211 match self {
212 Backend::Cpu => "CPU",
213 Backend::Cuda => "CUDA",
214 Backend::Metal => "Metal",
215 Backend::Wgpu => "WebGPU",
216 Backend::Auto => "Auto",
217 }
218 }
219}
220
221impl std::fmt::Display for Backend {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 write!(f, "{}", self.name())
224 }
225}
226
227#[derive(Debug, Clone)]
229pub struct LaunchOptions {
230 pub mode: KernelMode,
232 pub grid_size: u32,
234 pub block_size: u32,
236 pub input_queue_capacity: usize,
238 pub output_queue_capacity: usize,
240 pub shared_memory_size: usize,
242 pub auto_activate: bool,
244 pub cooperative: bool,
247 pub enable_k2k: bool,
250}
251
252impl Default for LaunchOptions {
253 fn default() -> Self {
254 Self {
255 mode: KernelMode::Persistent,
256 grid_size: 1,
257 block_size: 256,
258 input_queue_capacity: 1024,
259 output_queue_capacity: 1024,
260 shared_memory_size: 0,
261 auto_activate: true,
262 cooperative: false,
263 enable_k2k: false,
264 }
265 }
266}
267
268impl LaunchOptions {
269 pub fn single_block(block_size: u32) -> Self {
271 Self {
272 block_size,
273 ..Default::default()
274 }
275 }
276
277 pub fn multi_block(grid_size: u32, block_size: u32) -> Self {
279 Self {
280 grid_size,
281 block_size,
282 ..Default::default()
283 }
284 }
285
286 pub fn with_mode(mut self, mode: KernelMode) -> Self {
288 self.mode = mode;
289 self
290 }
291
292 pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
294 self.input_queue_capacity = capacity;
295 self.output_queue_capacity = capacity;
296 self
297 }
298
299 pub fn with_shared_memory(mut self, size: usize) -> Self {
301 self.shared_memory_size = size;
302 self
303 }
304
305 pub fn without_auto_activate(mut self) -> Self {
307 self.auto_activate = false;
308 self
309 }
310
311 pub fn with_grid_size(mut self, grid_size: u32) -> Self {
313 self.grid_size = grid_size;
314 self
315 }
316
317 pub fn with_block_size(mut self, block_size: u32) -> Self {
319 self.block_size = block_size;
320 self
321 }
322
323 pub fn with_cooperative(mut self, cooperative: bool) -> Self {
329 self.cooperative = cooperative;
330 self
331 }
332
333 pub fn with_k2k(mut self, enable: bool) -> Self {
338 self.enable_k2k = enable;
339 self
340 }
341
342 pub fn with_priority(self, _priority: u8) -> Self {
346 self
348 }
349
350 pub fn with_input_queue_capacity(mut self, capacity: usize) -> Self {
352 self.input_queue_capacity = capacity;
353 self
354 }
355
356 pub fn with_output_queue_capacity(mut self, capacity: usize) -> Self {
358 self.output_queue_capacity = capacity;
359 self
360 }
361}
362
363pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
365
366#[async_trait]
371pub trait RingKernelRuntime: Send + Sync {
372 fn backend(&self) -> Backend;
374
375 fn is_backend_available(&self, backend: Backend) -> bool;
377
378 async fn launch(&self, kernel_id: &str, options: LaunchOptions) -> Result<KernelHandle>;
380
381 fn get_kernel(&self, kernel_id: &KernelId) -> Option<KernelHandle>;
383
384 fn list_kernels(&self) -> Vec<KernelId>;
386
387 fn metrics(&self) -> RuntimeMetrics;
389
390 async fn shutdown(&self) -> Result<()>;
392}
393
394#[derive(Clone)]
398pub struct KernelHandle {
399 id: KernelId,
401 inner: Arc<dyn KernelHandleInner>,
403}
404
405impl KernelHandle {
406 pub fn new(id: KernelId, inner: Arc<dyn KernelHandleInner>) -> Self {
408 Self { id, inner }
409 }
410
411 pub fn id(&self) -> &KernelId {
413 &self.id
414 }
415
416 pub async fn activate(&self) -> Result<()> {
418 self.inner.activate().await
419 }
420
421 pub async fn deactivate(&self) -> Result<()> {
423 self.inner.deactivate().await
424 }
425
426 pub async fn terminate(&self) -> Result<()> {
428 self.inner.terminate().await
429 }
430
431 pub async fn send<M: RingMessage>(&self, message: M) -> Result<()> {
433 let envelope = MessageEnvelope::new(
434 &message,
435 0, self.inner.kernel_id_num(),
437 self.inner.current_timestamp(),
438 );
439 self.inner.send_envelope(envelope).await
440 }
441
442 pub async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()> {
444 self.inner.send_envelope(envelope).await
445 }
446
447 pub async fn receive(&self) -> Result<MessageEnvelope> {
449 self.inner.receive().await
450 }
451
452 pub async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope> {
454 self.inner.receive_timeout(timeout).await
455 }
456
457 pub fn try_receive(&self) -> Result<MessageEnvelope> {
459 self.inner.try_receive()
460 }
461
462 pub async fn call<M: RingMessage>(
464 &self,
465 message: M,
466 timeout: Duration,
467 ) -> Result<MessageEnvelope> {
468 let correlation = crate::message::CorrelationId::generate();
470
471 let mut envelope = MessageEnvelope::new(
473 &message,
474 0,
475 self.inner.kernel_id_num(),
476 self.inner.current_timestamp(),
477 );
478 envelope.header.correlation_id = correlation;
479
480 self.inner.send_envelope(envelope).await?;
482 self.inner.receive_correlated(correlation, timeout).await
483 }
484
485 pub fn status(&self) -> KernelStatus {
487 self.inner.status()
488 }
489
490 pub fn metrics(&self) -> KernelMetrics {
492 self.inner.metrics()
493 }
494
495 pub async fn wait(&self) -> Result<()> {
497 self.inner.wait().await
498 }
499
500 pub fn state(&self) -> KernelState {
504 self.status().state
505 }
506
507 pub async fn suspend(&self) -> Result<()> {
511 self.deactivate().await
512 }
513
514 pub async fn resume(&self) -> Result<()> {
518 self.activate().await
519 }
520
521 pub fn is_active(&self) -> bool {
523 self.state() == KernelState::Active
524 }
525
526 pub fn is_terminated(&self) -> bool {
528 self.state() == KernelState::Terminated
529 }
530}
531
532impl std::fmt::Debug for KernelHandle {
533 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534 f.debug_struct("KernelHandle")
535 .field("id", &self.id)
536 .finish()
537 }
538}
539
540#[async_trait]
544pub trait KernelHandleInner: Send + Sync {
545 fn kernel_id_num(&self) -> u64;
547
548 fn current_timestamp(&self) -> crate::hlc::HlcTimestamp;
550
551 async fn activate(&self) -> Result<()>;
553
554 async fn deactivate(&self) -> Result<()>;
556
557 async fn terminate(&self) -> Result<()>;
559
560 async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()>;
562
563 async fn receive(&self) -> Result<MessageEnvelope>;
565
566 async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope>;
568
569 fn try_receive(&self) -> Result<MessageEnvelope>;
571
572 async fn receive_correlated(
574 &self,
575 correlation: crate::message::CorrelationId,
576 timeout: Duration,
577 ) -> Result<MessageEnvelope>;
578
579 fn status(&self) -> KernelStatus;
581
582 fn metrics(&self) -> KernelMetrics;
584
585 async fn wait(&self) -> Result<()>;
587}
588
589#[derive(Debug, Clone, Default)]
591pub struct RuntimeMetrics {
592 pub active_kernels: usize,
594 pub total_launched: u64,
596 pub messages_sent: u64,
598 pub messages_received: u64,
600 pub gpu_memory_used: u64,
602 pub host_memory_used: u64,
604}
605
606#[derive(Debug, Clone)]
608pub struct RuntimeBuilder {
609 pub backend: Backend,
611 pub device_index: usize,
613 pub debug: bool,
615 pub profiling: bool,
617}
618
619impl Default for RuntimeBuilder {
620 fn default() -> Self {
621 Self {
622 backend: Backend::Auto,
623 device_index: 0,
624 debug: false,
625 profiling: false,
626 }
627 }
628}
629
630impl RuntimeBuilder {
631 pub fn new() -> Self {
633 Self::default()
634 }
635
636 pub fn backend(mut self, backend: Backend) -> Self {
638 self.backend = backend;
639 self
640 }
641
642 pub fn device(mut self, index: usize) -> Self {
644 self.device_index = index;
645 self
646 }
647
648 pub fn debug(mut self, enable: bool) -> Self {
650 self.debug = enable;
651 self
652 }
653
654 pub fn profiling(mut self, enable: bool) -> Self {
656 self.profiling = enable;
657 self
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664
665 #[test]
666 fn test_kernel_state_transitions() {
667 assert!(KernelState::Launched.can_activate());
668 assert!(KernelState::Deactivated.can_activate());
669 assert!(!KernelState::Active.can_activate());
670 assert!(!KernelState::Terminated.can_activate());
671
672 assert!(KernelState::Active.can_deactivate());
673 assert!(!KernelState::Launched.can_deactivate());
674
675 assert!(KernelState::Active.can_terminate());
676 assert!(KernelState::Deactivated.can_terminate());
677 assert!(!KernelState::Terminated.can_terminate());
678 }
679
680 #[test]
681 fn test_launch_options_builder() {
682 let opts = LaunchOptions::multi_block(4, 128)
683 .with_mode(KernelMode::EventDriven)
684 .with_queue_capacity(2048)
685 .with_shared_memory(4096)
686 .without_auto_activate();
687
688 assert_eq!(opts.grid_size, 4);
689 assert_eq!(opts.block_size, 128);
690 assert_eq!(opts.mode, KernelMode::EventDriven);
691 assert_eq!(opts.input_queue_capacity, 2048);
692 assert_eq!(opts.shared_memory_size, 4096);
693 assert!(!opts.auto_activate);
694 }
695
696 #[test]
697 fn test_kernel_id() {
698 let id1 = KernelId::new("test_kernel");
699 let id2: KernelId = "test_kernel".into();
700 assert_eq!(id1, id2);
701 assert_eq!(id1.as_str(), "test_kernel");
702 }
703
704 #[test]
705 fn test_backend_name() {
706 assert_eq!(Backend::Cpu.name(), "CPU");
707 assert_eq!(Backend::Cuda.name(), "CUDA");
708 assert_eq!(Backend::Metal.name(), "Metal");
709 assert_eq!(Backend::Wgpu.name(), "WebGPU");
710 }
711}