oris_kernel/kernel/
runner.rs1use std::sync::Arc;
9
10use crate::kernel::driver::{Kernel, RunStatus, Signal};
11use crate::kernel::identity::RunId;
12use crate::kernel::state::KernelState;
13use crate::kernel::KernelError;
14
15pub struct KernelRunner<S: KernelState> {
22 kernel: Arc<Kernel<S>>,
23}
24
25impl<S: KernelState> KernelRunner<S> {
26 pub fn new(kernel: Kernel<S>) -> Self {
28 Self {
29 kernel: Arc::new(kernel),
30 }
31 }
32
33 pub fn run_until_blocked_sync(
36 &self,
37 run_id: &RunId,
38 initial_state: S,
39 ) -> Result<RunStatus, KernelError> {
40 let kernel = Arc::clone(&self.kernel);
41 let run_id = run_id.clone();
42 let (tx, rx) = std::sync::mpsc::channel();
43 std::thread::spawn(move || {
44 let rt = match tokio::runtime::Builder::new_current_thread()
45 .enable_all()
46 .build()
47 {
48 Ok(rt) => rt,
49 Err(e) => {
50 let _ = tx.send(Err(KernelError::Driver(e.to_string())));
51 return;
52 }
53 };
54 let _guard = rt.enter();
56 let result = kernel.run_until_blocked(&run_id, initial_state);
57 let _ = tx.send(result);
58 });
59 rx.recv()
60 .map_err(|_| KernelError::Driver("runner thread panicked or dropped".into()))?
61 }
62
63 pub async fn run_until_blocked_async(
66 &self,
67 run_id: &RunId,
68 initial_state: S,
69 ) -> Result<RunStatus, KernelError> {
70 let kernel = Arc::clone(&self.kernel);
71 let run_id = run_id.clone();
72 tokio::task::spawn_blocking(move || {
73 let rt = tokio::runtime::Builder::new_current_thread()
74 .enable_all()
75 .build()
76 .map_err(|e| KernelError::Driver(e.to_string()))?;
77 let _guard = rt.enter();
78 kernel.run_until_blocked(&run_id, initial_state)
79 })
80 .await
81 .map_err(|e| KernelError::Driver(e.to_string()))?
82 }
83
84 pub fn resume_sync(
86 &self,
87 run_id: &RunId,
88 initial_state: S,
89 signal: Signal,
90 ) -> Result<RunStatus, KernelError> {
91 let kernel = Arc::clone(&self.kernel);
92 let run_id = run_id.clone();
93 let (tx, rx) = std::sync::mpsc::channel();
94 std::thread::spawn(move || {
95 let rt = match tokio::runtime::Builder::new_current_thread()
96 .enable_all()
97 .build()
98 {
99 Ok(rt) => rt,
100 Err(e) => {
101 let _ = tx.send(Err(KernelError::Driver(e.to_string())));
102 return;
103 }
104 };
105 let _guard = rt.enter();
106 let result = kernel.resume(&run_id, initial_state, signal);
107 let _ = tx.send(result);
108 });
109 rx.recv()
110 .map_err(|_| KernelError::Driver("runner thread panicked or dropped".into()))?
111 }
112
113 pub async fn resume_async(
115 &self,
116 run_id: &RunId,
117 initial_state: S,
118 signal: Signal,
119 ) -> Result<RunStatus, KernelError> {
120 let kernel = Arc::clone(&self.kernel);
121 let run_id = run_id.clone();
122 tokio::task::spawn_blocking(move || {
123 let rt = tokio::runtime::Builder::new_current_thread()
124 .enable_all()
125 .build()
126 .map_err(|e| KernelError::Driver(e.to_string()))?;
127 let _guard = rt.enter();
128 kernel.resume(&run_id, initial_state, signal)
129 })
130 .await
131 .map_err(|e| KernelError::Driver(e.to_string()))?
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use crate::kernel::driver::{Kernel, RunStatus};
139 use crate::kernel::event_store::InMemoryEventStore;
140 use crate::kernel::reducer::StateUpdatedOnlyReducer;
141 use crate::kernel::state::KernelState;
142 use crate::kernel::stubs::{AllowAllPolicy, NoopActionExecutor, NoopStepFn};
143 use serde::{Deserialize, Serialize};
144
145 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
146 struct TestState(u32);
147 impl KernelState for TestState {
148 fn version(&self) -> u32 {
149 1
150 }
151 }
152
153 #[test]
154 fn run_until_blocked_sync_completes() {
155 let kernel = Kernel::<TestState> {
156 events: Box::new(InMemoryEventStore::new()),
157 snaps: None,
158 reducer: Box::new(StateUpdatedOnlyReducer),
159 exec: Box::new(NoopActionExecutor),
160 step: Box::new(NoopStepFn),
161 policy: Box::new(AllowAllPolicy),
162 effect_sink: None,
163 mode: crate::kernel::KernelMode::Normal,
164 };
165 let runner = KernelRunner::new(kernel);
166 let run_id = "runner-sync-test".to_string();
167 let status = runner
168 .run_until_blocked_sync(&run_id, TestState(0))
169 .unwrap();
170 assert!(matches!(status, RunStatus::Completed));
171 }
172
173 #[tokio::test]
174 async fn run_until_blocked_async_completes_no_deadlock() {
175 let kernel = Kernel::<TestState> {
176 events: Box::new(InMemoryEventStore::new()),
177 snaps: None,
178 reducer: Box::new(StateUpdatedOnlyReducer),
179 exec: Box::new(NoopActionExecutor),
180 step: Box::new(NoopStepFn),
181 policy: Box::new(AllowAllPolicy),
182 effect_sink: None,
183 mode: crate::kernel::KernelMode::Normal,
184 };
185 let runner = KernelRunner::new(kernel);
186 let run_id = "runner-async-test".to_string();
187 let status = runner
188 .run_until_blocked_async(&run_id, TestState(0))
189 .await
190 .unwrap();
191 assert!(matches!(status, RunStatus::Completed));
192 }
193
194 #[tokio::test]
195 async fn run_until_blocked_async_twice_no_hang() {
196 let kernel = Kernel::<TestState> {
197 events: Box::new(InMemoryEventStore::new()),
198 snaps: None,
199 reducer: Box::new(StateUpdatedOnlyReducer),
200 exec: Box::new(NoopActionExecutor),
201 step: Box::new(NoopStepFn),
202 policy: Box::new(AllowAllPolicy),
203 effect_sink: None,
204 mode: crate::kernel::KernelMode::Normal,
205 };
206 let runner = KernelRunner::new(kernel);
207 let status1 = runner
208 .run_until_blocked_async(&"run-1".to_string(), TestState(0))
209 .await
210 .unwrap();
211 let status2 = runner
212 .run_until_blocked_async(&"run-2".to_string(), TestState(0))
213 .await
214 .unwrap();
215 assert!(matches!(status1, RunStatus::Completed));
216 assert!(matches!(status2, RunStatus::Completed));
217 }
218
219 #[tokio::test]
221 async fn run_until_blocked_async_completes_within_timeout() {
222 let kernel = Kernel::<TestState> {
223 events: Box::new(InMemoryEventStore::new()),
224 snaps: None,
225 reducer: Box::new(StateUpdatedOnlyReducer),
226 exec: Box::new(NoopActionExecutor),
227 step: Box::new(NoopStepFn),
228 policy: Box::new(AllowAllPolicy),
229 effect_sink: None,
230 mode: crate::kernel::KernelMode::Normal,
231 };
232 let runner = KernelRunner::new(kernel);
233 let result = tokio::time::timeout(
234 std::time::Duration::from_secs(5),
235 runner.run_until_blocked_async(&"timeout-test".to_string(), TestState(0)),
236 )
237 .await;
238 assert!(
239 result.is_ok(),
240 "run_until_blocked_async should complete within 5s (no deadlock)"
241 );
242 let status = result.unwrap().unwrap();
243 assert!(matches!(status, RunStatus::Completed));
244 }
245}