1use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use async_trait::async_trait;
12
13use crate::error::Result;
14use crate::message::{MessageEnvelope, RingMessage};
15use crate::telemetry::KernelMetrics;
16use crate::types::KernelMode;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct KernelId(pub String);
21
22impl KernelId {
23 pub fn new(id: impl Into<String>) -> Self {
25 Self(id.into())
26 }
27
28 pub fn as_str(&self) -> &str {
30 &self.0
31 }
32}
33
34impl std::fmt::Display for KernelId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "{}", self.0)
37 }
38}
39
40impl From<&str> for KernelId {
41 fn from(s: &str) -> Self {
42 Self(s.to_string())
43 }
44}
45
46impl From<String> for KernelId {
47 fn from(s: String) -> Self {
48 Self(s)
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub enum KernelState {
55 Created,
57 Launched,
59 Active,
61 Deactivated,
63 Terminating,
65 Terminated,
67}
68
69impl KernelState {
70 pub fn can_activate(&self) -> bool {
72 matches!(self, Self::Launched | Self::Deactivated)
73 }
74
75 pub fn can_deactivate(&self) -> bool {
77 matches!(self, Self::Active)
78 }
79
80 pub fn can_terminate(&self) -> bool {
82 matches!(self, Self::Active | Self::Deactivated | Self::Launched)
83 }
84
85 pub fn is_running(&self) -> bool {
87 matches!(self, Self::Active)
88 }
89
90 pub fn is_finished(&self) -> bool {
92 matches!(self, Self::Terminated)
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct KernelStatus {
99 pub id: KernelId,
101 pub state: KernelState,
103 pub mode: KernelMode,
105 pub input_queue_depth: usize,
107 pub output_queue_depth: usize,
109 pub messages_processed: u64,
111 pub uptime: Duration,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
117pub enum Backend {
118 Cpu,
120 Cuda,
122 Metal,
124 Wgpu,
126 #[default]
128 Auto,
129}
130
131impl Backend {
132 pub fn name(&self) -> &'static str {
134 match self {
135 Backend::Cpu => "CPU",
136 Backend::Cuda => "CUDA",
137 Backend::Metal => "Metal",
138 Backend::Wgpu => "WebGPU",
139 Backend::Auto => "Auto",
140 }
141 }
142}
143
144impl std::fmt::Display for Backend {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 write!(f, "{}", self.name())
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct LaunchOptions {
153 pub mode: KernelMode,
155 pub grid_size: u32,
157 pub block_size: u32,
159 pub input_queue_capacity: usize,
161 pub output_queue_capacity: usize,
163 pub shared_memory_size: usize,
165 pub auto_activate: bool,
167}
168
169impl Default for LaunchOptions {
170 fn default() -> Self {
171 Self {
172 mode: KernelMode::Persistent,
173 grid_size: 1,
174 block_size: 256,
175 input_queue_capacity: 1024,
176 output_queue_capacity: 1024,
177 shared_memory_size: 0,
178 auto_activate: true,
179 }
180 }
181}
182
183impl LaunchOptions {
184 pub fn single_block(block_size: u32) -> Self {
186 Self {
187 block_size,
188 ..Default::default()
189 }
190 }
191
192 pub fn multi_block(grid_size: u32, block_size: u32) -> Self {
194 Self {
195 grid_size,
196 block_size,
197 ..Default::default()
198 }
199 }
200
201 pub fn with_mode(mut self, mode: KernelMode) -> Self {
203 self.mode = mode;
204 self
205 }
206
207 pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
209 self.input_queue_capacity = capacity;
210 self.output_queue_capacity = capacity;
211 self
212 }
213
214 pub fn with_shared_memory(mut self, size: usize) -> Self {
216 self.shared_memory_size = size;
217 self
218 }
219
220 pub fn without_auto_activate(mut self) -> Self {
222 self.auto_activate = false;
223 self
224 }
225
226 pub fn with_grid_size(mut self, grid_size: u32) -> Self {
228 self.grid_size = grid_size;
229 self
230 }
231
232 pub fn with_block_size(mut self, block_size: u32) -> Self {
234 self.block_size = block_size;
235 self
236 }
237
238 pub fn with_priority(self, _priority: u8) -> Self {
242 self
244 }
245
246 pub fn with_input_queue_capacity(mut self, capacity: usize) -> Self {
248 self.input_queue_capacity = capacity;
249 self
250 }
251
252 pub fn with_output_queue_capacity(mut self, capacity: usize) -> Self {
254 self.output_queue_capacity = capacity;
255 self
256 }
257}
258
259pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
261
262#[async_trait]
267pub trait RingKernelRuntime: Send + Sync {
268 fn backend(&self) -> Backend;
270
271 fn is_backend_available(&self, backend: Backend) -> bool;
273
274 async fn launch(&self, kernel_id: &str, options: LaunchOptions) -> Result<KernelHandle>;
276
277 fn get_kernel(&self, kernel_id: &KernelId) -> Option<KernelHandle>;
279
280 fn list_kernels(&self) -> Vec<KernelId>;
282
283 fn metrics(&self) -> RuntimeMetrics;
285
286 async fn shutdown(&self) -> Result<()>;
288}
289
290#[derive(Clone)]
294pub struct KernelHandle {
295 id: KernelId,
297 inner: Arc<dyn KernelHandleInner>,
299}
300
301impl KernelHandle {
302 pub fn new(id: KernelId, inner: Arc<dyn KernelHandleInner>) -> Self {
304 Self { id, inner }
305 }
306
307 pub fn id(&self) -> &KernelId {
309 &self.id
310 }
311
312 pub async fn activate(&self) -> Result<()> {
314 self.inner.activate().await
315 }
316
317 pub async fn deactivate(&self) -> Result<()> {
319 self.inner.deactivate().await
320 }
321
322 pub async fn terminate(&self) -> Result<()> {
324 self.inner.terminate().await
325 }
326
327 pub async fn send<M: RingMessage>(&self, message: M) -> Result<()> {
329 let envelope = MessageEnvelope::new(
330 &message,
331 0, self.inner.kernel_id_num(),
333 self.inner.current_timestamp(),
334 );
335 self.inner.send_envelope(envelope).await
336 }
337
338 pub async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()> {
340 self.inner.send_envelope(envelope).await
341 }
342
343 pub async fn receive(&self) -> Result<MessageEnvelope> {
345 self.inner.receive().await
346 }
347
348 pub async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope> {
350 self.inner.receive_timeout(timeout).await
351 }
352
353 pub fn try_receive(&self) -> Result<MessageEnvelope> {
355 self.inner.try_receive()
356 }
357
358 pub async fn call<M: RingMessage>(
360 &self,
361 message: M,
362 timeout: Duration,
363 ) -> Result<MessageEnvelope> {
364 let correlation = crate::message::CorrelationId::generate();
366
367 let mut envelope = MessageEnvelope::new(
369 &message,
370 0,
371 self.inner.kernel_id_num(),
372 self.inner.current_timestamp(),
373 );
374 envelope.header.correlation_id = correlation;
375
376 self.inner.send_envelope(envelope).await?;
378 self.inner.receive_correlated(correlation, timeout).await
379 }
380
381 pub fn status(&self) -> KernelStatus {
383 self.inner.status()
384 }
385
386 pub fn metrics(&self) -> KernelMetrics {
388 self.inner.metrics()
389 }
390
391 pub async fn wait(&self) -> Result<()> {
393 self.inner.wait().await
394 }
395
396 pub fn state(&self) -> KernelState {
400 self.status().state
401 }
402
403 pub async fn suspend(&self) -> Result<()> {
407 self.deactivate().await
408 }
409
410 pub async fn resume(&self) -> Result<()> {
414 self.activate().await
415 }
416
417 pub fn is_active(&self) -> bool {
419 self.state() == KernelState::Active
420 }
421
422 pub fn is_terminated(&self) -> bool {
424 self.state() == KernelState::Terminated
425 }
426}
427
428impl std::fmt::Debug for KernelHandle {
429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 f.debug_struct("KernelHandle")
431 .field("id", &self.id)
432 .finish()
433 }
434}
435
436#[async_trait]
440pub trait KernelHandleInner: Send + Sync {
441 fn kernel_id_num(&self) -> u64;
443
444 fn current_timestamp(&self) -> crate::hlc::HlcTimestamp;
446
447 async fn activate(&self) -> Result<()>;
449
450 async fn deactivate(&self) -> Result<()>;
452
453 async fn terminate(&self) -> Result<()>;
455
456 async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()>;
458
459 async fn receive(&self) -> Result<MessageEnvelope>;
461
462 async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope>;
464
465 fn try_receive(&self) -> Result<MessageEnvelope>;
467
468 async fn receive_correlated(
470 &self,
471 correlation: crate::message::CorrelationId,
472 timeout: Duration,
473 ) -> Result<MessageEnvelope>;
474
475 fn status(&self) -> KernelStatus;
477
478 fn metrics(&self) -> KernelMetrics;
480
481 async fn wait(&self) -> Result<()>;
483}
484
485#[derive(Debug, Clone, Default)]
487pub struct RuntimeMetrics {
488 pub active_kernels: usize,
490 pub total_launched: u64,
492 pub messages_sent: u64,
494 pub messages_received: u64,
496 pub gpu_memory_used: u64,
498 pub host_memory_used: u64,
500}
501
502#[derive(Debug, Clone)]
504pub struct RuntimeBuilder {
505 pub backend: Backend,
507 pub device_index: usize,
509 pub debug: bool,
511 pub profiling: bool,
513}
514
515impl Default for RuntimeBuilder {
516 fn default() -> Self {
517 Self {
518 backend: Backend::Auto,
519 device_index: 0,
520 debug: false,
521 profiling: false,
522 }
523 }
524}
525
526impl RuntimeBuilder {
527 pub fn new() -> Self {
529 Self::default()
530 }
531
532 pub fn backend(mut self, backend: Backend) -> Self {
534 self.backend = backend;
535 self
536 }
537
538 pub fn device(mut self, index: usize) -> Self {
540 self.device_index = index;
541 self
542 }
543
544 pub fn debug(mut self, enable: bool) -> Self {
546 self.debug = enable;
547 self
548 }
549
550 pub fn profiling(mut self, enable: bool) -> Self {
552 self.profiling = enable;
553 self
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 #[test]
562 fn test_kernel_state_transitions() {
563 assert!(KernelState::Launched.can_activate());
564 assert!(KernelState::Deactivated.can_activate());
565 assert!(!KernelState::Active.can_activate());
566 assert!(!KernelState::Terminated.can_activate());
567
568 assert!(KernelState::Active.can_deactivate());
569 assert!(!KernelState::Launched.can_deactivate());
570
571 assert!(KernelState::Active.can_terminate());
572 assert!(KernelState::Deactivated.can_terminate());
573 assert!(!KernelState::Terminated.can_terminate());
574 }
575
576 #[test]
577 fn test_launch_options_builder() {
578 let opts = LaunchOptions::multi_block(4, 128)
579 .with_mode(KernelMode::EventDriven)
580 .with_queue_capacity(2048)
581 .with_shared_memory(4096)
582 .without_auto_activate();
583
584 assert_eq!(opts.grid_size, 4);
585 assert_eq!(opts.block_size, 128);
586 assert_eq!(opts.mode, KernelMode::EventDriven);
587 assert_eq!(opts.input_queue_capacity, 2048);
588 assert_eq!(opts.shared_memory_size, 4096);
589 assert!(!opts.auto_activate);
590 }
591
592 #[test]
593 fn test_kernel_id() {
594 let id1 = KernelId::new("test_kernel");
595 let id2: KernelId = "test_kernel".into();
596 assert_eq!(id1, id2);
597 assert_eq!(id1.as_str(), "test_kernel");
598 }
599
600 #[test]
601 fn test_backend_name() {
602 assert_eq!(Backend::Cpu.name(), "CPU");
603 assert_eq!(Backend::Cuda.name(), "CUDA");
604 assert_eq!(Backend::Metal.name(), "Metal");
605 assert_eq!(Backend::Wgpu.name(), "WebGPU");
606 }
607}