Skip to main content

oris_kernel/kernel/
runner.rs

1//! KernelRunner: recommended way to run the kernel from sync or async code.
2//!
3//! Encapsulates the correct Tokio runtime usage so callers do not need to
4//! use `rt.enter()`, `block_in_place`, or `spawn_blocking` manually.
5//! Use this instead of calling `kernel.run_until_blocked` directly when
6//! using GraphStepFnAdapter or other step functions that require a runtime.
7
8use std::sync::Arc;
9
10use crate::kernel::driver::{Kernel, RunStatus, Signal};
11use crate::kernel::identity::RunId;
12use crate::kernel::state::KernelState;
13use crate::kernel::KernelError;
14
15/// Runner that executes the kernel with correct runtime handling.
16///
17/// - **Sync**: Runs the kernel on a dedicated thread with its own Tokio runtime,
18///   so you can call from any thread without an existing runtime.
19/// - **Async**: Uses `spawn_blocking` so the kernel runs on a blocking thread
20///   and does not block the async reactor.
21pub struct KernelRunner<S: KernelState> {
22    kernel: Arc<Kernel<S>>,
23}
24
25impl<S: KernelState> KernelRunner<S> {
26    /// Creates a runner that will use the given kernel for all runs.
27    pub fn new(kernel: Kernel<S>) -> Self {
28        Self {
29            kernel: Arc::new(kernel),
30        }
31    }
32
33    /// Sync entry: runs until blocked/completed on a dedicated thread with an
34    /// internal runtime. Blocks the current thread until the run finishes.
35    pub fn run_until_blocked_sync(
36        &self,
37        run_id: &RunId,
38        initial_state: S,
39    ) -> Result<RunStatus, KernelError> {
40        let kernel = Arc::clone(&self.kernel);
41        let run_id = run_id.clone();
42        let (tx, rx) = std::sync::mpsc::channel();
43        std::thread::spawn(move || {
44            let rt = match tokio::runtime::Builder::new_current_thread()
45                .enable_all()
46                .build()
47            {
48                Ok(rt) => rt,
49                Err(e) => {
50                    let _ = tx.send(Err(KernelError::Driver(e.to_string())));
51                    return;
52                }
53            };
54            // Enter the runtime so step adapters' block_on work; do not nest block_on here.
55            let _guard = rt.enter();
56            let result = kernel.run_until_blocked(&run_id, initial_state);
57            let _ = tx.send(result);
58        });
59        rx.recv()
60            .map_err(|_| KernelError::Driver("runner thread panicked or dropped".into()))?
61    }
62
63    /// Async entry: runs the kernel inside `spawn_blocking` so the async reactor
64    /// is not blocked. Use from async code without deadlock.
65    pub async fn run_until_blocked_async(
66        &self,
67        run_id: &RunId,
68        initial_state: S,
69    ) -> Result<RunStatus, KernelError> {
70        let kernel = Arc::clone(&self.kernel);
71        let run_id = run_id.clone();
72        tokio::task::spawn_blocking(move || {
73            let rt = tokio::runtime::Builder::new_current_thread()
74                .enable_all()
75                .build()
76                .map_err(|e| KernelError::Driver(e.to_string()))?;
77            let _guard = rt.enter();
78            kernel.run_until_blocked(&run_id, initial_state)
79        })
80        .await
81        .map_err(|e| KernelError::Driver(e.to_string()))?
82    }
83
84    /// Sync resume: same as run_until_blocked_sync but after appending a resume event.
85    pub fn resume_sync(
86        &self,
87        run_id: &RunId,
88        initial_state: S,
89        signal: Signal,
90    ) -> Result<RunStatus, KernelError> {
91        let kernel = Arc::clone(&self.kernel);
92        let run_id = run_id.clone();
93        let (tx, rx) = std::sync::mpsc::channel();
94        std::thread::spawn(move || {
95            let rt = match tokio::runtime::Builder::new_current_thread()
96                .enable_all()
97                .build()
98            {
99                Ok(rt) => rt,
100                Err(e) => {
101                    let _ = tx.send(Err(KernelError::Driver(e.to_string())));
102                    return;
103                }
104            };
105            let _guard = rt.enter();
106            let result = kernel.resume(&run_id, initial_state, signal);
107            let _ = tx.send(result);
108        });
109        rx.recv()
110            .map_err(|_| KernelError::Driver("runner thread panicked or dropped".into()))?
111    }
112
113    /// Async resume: same as run_until_blocked_async but after appending a resume event.
114    pub async fn resume_async(
115        &self,
116        run_id: &RunId,
117        initial_state: S,
118        signal: Signal,
119    ) -> Result<RunStatus, KernelError> {
120        let kernel = Arc::clone(&self.kernel);
121        let run_id = run_id.clone();
122        tokio::task::spawn_blocking(move || {
123            let rt = tokio::runtime::Builder::new_current_thread()
124                .enable_all()
125                .build()
126                .map_err(|e| KernelError::Driver(e.to_string()))?;
127            let _guard = rt.enter();
128            kernel.resume(&run_id, initial_state, signal)
129        })
130        .await
131        .map_err(|e| KernelError::Driver(e.to_string()))?
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use crate::kernel::driver::{Kernel, RunStatus};
139    use crate::kernel::event_store::InMemoryEventStore;
140    use crate::kernel::reducer::StateUpdatedOnlyReducer;
141    use crate::kernel::state::KernelState;
142    use crate::kernel::stubs::{AllowAllPolicy, NoopActionExecutor, NoopStepFn};
143    use serde::{Deserialize, Serialize};
144
145    #[derive(Clone, Debug, Default, Serialize, Deserialize)]
146    struct TestState(u32);
147    impl KernelState for TestState {
148        fn version(&self) -> u32 {
149            1
150        }
151    }
152
153    #[test]
154    fn run_until_blocked_sync_completes() {
155        let kernel = Kernel::<TestState> {
156            events: Box::new(InMemoryEventStore::new()),
157            snaps: None,
158            reducer: Box::new(StateUpdatedOnlyReducer),
159            exec: Box::new(NoopActionExecutor),
160            step: Box::new(NoopStepFn),
161            policy: Box::new(AllowAllPolicy),
162            effect_sink: None,
163            mode: crate::kernel::KernelMode::Normal,
164        };
165        let runner = KernelRunner::new(kernel);
166        let run_id = "runner-sync-test".to_string();
167        let status = runner
168            .run_until_blocked_sync(&run_id, TestState(0))
169            .unwrap();
170        assert!(matches!(status, RunStatus::Completed));
171    }
172
173    #[tokio::test]
174    async fn run_until_blocked_async_completes_no_deadlock() {
175        let kernel = Kernel::<TestState> {
176            events: Box::new(InMemoryEventStore::new()),
177            snaps: None,
178            reducer: Box::new(StateUpdatedOnlyReducer),
179            exec: Box::new(NoopActionExecutor),
180            step: Box::new(NoopStepFn),
181            policy: Box::new(AllowAllPolicy),
182            effect_sink: None,
183            mode: crate::kernel::KernelMode::Normal,
184        };
185        let runner = KernelRunner::new(kernel);
186        let run_id = "runner-async-test".to_string();
187        let status = runner
188            .run_until_blocked_async(&run_id, TestState(0))
189            .await
190            .unwrap();
191        assert!(matches!(status, RunStatus::Completed));
192    }
193
194    #[tokio::test]
195    async fn run_until_blocked_async_twice_no_hang() {
196        let kernel = Kernel::<TestState> {
197            events: Box::new(InMemoryEventStore::new()),
198            snaps: None,
199            reducer: Box::new(StateUpdatedOnlyReducer),
200            exec: Box::new(NoopActionExecutor),
201            step: Box::new(NoopStepFn),
202            policy: Box::new(AllowAllPolicy),
203            effect_sink: None,
204            mode: crate::kernel::KernelMode::Normal,
205        };
206        let runner = KernelRunner::new(kernel);
207        let status1 = runner
208            .run_until_blocked_async(&"run-1".to_string(), TestState(0))
209            .await
210            .unwrap();
211        let status2 = runner
212            .run_until_blocked_async(&"run-2".to_string(), TestState(0))
213            .await
214            .unwrap();
215        assert!(matches!(status1, RunStatus::Completed));
216        assert!(matches!(status2, RunStatus::Completed));
217    }
218
219    /// CI-style: from async context, runner must complete within a timeout (no reactor blocking).
220    #[tokio::test]
221    async fn run_until_blocked_async_completes_within_timeout() {
222        let kernel = Kernel::<TestState> {
223            events: Box::new(InMemoryEventStore::new()),
224            snaps: None,
225            reducer: Box::new(StateUpdatedOnlyReducer),
226            exec: Box::new(NoopActionExecutor),
227            step: Box::new(NoopStepFn),
228            policy: Box::new(AllowAllPolicy),
229            effect_sink: None,
230            mode: crate::kernel::KernelMode::Normal,
231        };
232        let runner = KernelRunner::new(kernel);
233        let result = tokio::time::timeout(
234            std::time::Duration::from_secs(5),
235            runner.run_until_blocked_async(&"timeout-test".to_string(), TestState(0)),
236        )
237        .await;
238        assert!(
239            result.is_ok(),
240            "run_until_blocked_async should complete within 5s (no deadlock)"
241        );
242        let status = result.unwrap().unwrap();
243        assert!(matches!(status, RunStatus::Completed));
244    }
245}