1#[cfg(feature = "async_tokio")]
15pub mod tokio_runtime;
16
17use std::any::{type_name, TypeId};
18use std::fmt::Debug;
19use std::sync::{Arc, Mutex};
20use std::thread;
21
22#[allow(unused_imports)]
23#[cfg(feature = "async")]
24use futures::executor as executor_executor;
25
26use bronzeflow_time::schedule_time::ScheduleTimeHolder;
27#[cfg(feature = "async_tokio")]
28use tokio;
29#[cfg(feature = "async_tokio")]
30use tokio::spawn as tokio_spawn;
31
32#[derive(Debug)]
33pub enum RuntimeJoinHandle<T> {
34 SyncJobHandle,
35
36 #[cfg(feature = "async_tokio")]
37 AsyncTokioJoinHandle(tokio::task::JoinHandle<T>),
38
39 #[cfg(feature = "async")]
40 FutureBlockJoinHandle(T),
41
42 _Unreachable(std::convert::Infallible, std::marker::PhantomData<T>),
43}
44
45pub(crate) trait NotFnRunnable {}
46
47#[cfg(feature = "async")]
48#[derive(Debug, Clone)]
49pub struct AsyncFn<F: Fn() -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static>(
50 pub F,
51);
52
53#[derive(Debug, Clone)]
54pub struct SyncFn<F: Fn() + Send + 'static + Clone>(pub F);
55
56#[cfg(feature = "async")]
57impl<F: Fn() -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static> From<F>
58 for AsyncFn<F, U>
59{
60 fn from(value: F) -> Self {
61 AsyncFn(value)
62 }
63}
64
65impl<F: Fn() + Send + 'static + Clone> From<F> for SyncFn<F> {
69 fn from(value: F) -> Self {
70 SyncFn(value)
71 }
72}
73
74pub trait Runnable: 'static {
83 type Handle = RuntimeJoinHandle<()>;
84
85 fn name(&self) -> String {
87 "test name in runnable".to_string()
88 }
89
90 fn set_name(&mut self, _: &str) {}
92
93 #[inline(always)]
94 fn run(&self) -> Self::Handle {
95 self.run_async()
96 }
97
98 fn run_async(&self) -> Self::Handle;
99
100 #[inline(always)]
101 fn is_async(&self) -> bool {
102 false
103 }
104
105 fn metadata(&self) -> Option<RunnableMetadata> {
106 None
107 }
108
109 #[inline(always)]
110 fn run_type_name(&self) -> String {
111 type_name::<Self>().to_string()
112 }
113
114 #[inline(always)]
115 fn run_type_id(&self) -> TypeId {
116 TypeId::of::<Self>()
117 }
118 }
126
127pub trait BuildFromRunnable {
129 type Type;
130 fn build_from(
131 runnable: impl Runnable<Handle = RuntimeJoinHandle<()>> + Send + 'static,
132 ) -> Self::Type;
133}
134
135pub fn run_async<F, U>(runnable: &F) -> RuntimeJoinHandle<()>
136where
137 F: Fn() -> U + Send + Clone + 'static,
138 U: std::future::Future + Send + 'static,
139{
140 #[allow(unused_variables)]
141 let f = runnable();
142 cfg_if::cfg_if! {
143 if #[cfg(feature = "async_tokio")] {
144 let handle = tokio_spawn({
145 async {
146 f.await;
147 }
148 });
149 RuntimeJoinHandle::AsyncTokioJoinHandle(handle)
150 } else if #[cfg(feature = "async")] {
151 let _output = executor_executor::block_on({
154 async {
155 f.await
156 }
157 });
158 RuntimeJoinHandle::FutureBlockJoinHandle(())
160 } else {
161 panic!("Not support run async");
162 }
163 }
164}
165
166#[cfg(feature = "async")]
167impl<F: Fn() -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static> Runnable
168 for AsyncFn<F, U>
169{
170 type Handle = RuntimeJoinHandle<()>;
171
172 #[inline(always)]
173 fn run_async(&self) -> Self::Handle {
174 run_async(&self.0)
175 }
176
177 #[inline(always)]
178 fn is_async(&self) -> bool {
179 true
180 }
181}
182impl<F: Fn() + Send + 'static + Clone> Runnable for SyncFn<F> {
191 type Handle = RuntimeJoinHandle<()>;
192
193 #[inline(always)]
194 fn run_async(&self) -> Self::Handle {
195 self.0();
196 RuntimeJoinHandle::SyncJobHandle
197 }
198}
199
200impl<F: Fn() + Send + 'static + Clone> Runnable for F {
201 fn run_async(&self) -> Self::Handle {
202 self();
203 RuntimeJoinHandle::SyncJobHandle
204 }
205}
206
207pub type RunnerType = Box<dyn Runnable<Handle = RuntimeJoinHandle<()>> + 'static + Send>;
208
209pub struct WrappedRunner(pub RunnerType);
210
211#[derive(Clone)]
212pub struct SafeWrappedRunner(pub(crate) Arc<Mutex<WrappedRunner>>);
213
214impl Runnable for WrappedRunner {
215 type Handle = RuntimeJoinHandle<()>;
216
217 #[inline(always)]
218 fn run_async(&self) -> Self::Handle {
219 self.0.run_async()
220 }
221
222 #[inline(always)]
223 fn run_type_name(&self) -> String {
224 self.0.run_type_name()
225 }
226
227 #[inline(always)]
228 fn run_type_id(&self) -> TypeId {
229 self.0.run_type_id()
230 }
231}
232
233impl Runnable for SafeWrappedRunner {
234 type Handle = RuntimeJoinHandle<()>;
235
236 #[inline(always)]
237 fn run_async(&self) -> Self::Handle {
238 self.0.lock().unwrap().0.run_async()
239 }
240
241 #[inline(always)]
242 fn run_type_name(&self) -> String {
243 self.0.lock().unwrap().0.run_type_name()
245 }
246 #[inline(always)]
247 fn run_type_id(&self) -> TypeId {
248 self.0.lock().unwrap().0.run_type_id()
249 }
250}
251
252pub trait BronzeRuntime {
253 fn run(&self, runnable: impl Runnable, report_msg: bool);
254
255 fn run_safe<F>(&self, runnable: F, report_msg: bool)
256 where
257 F: Runnable + Send + Sync + 'static,
258 {
259 self.run(runnable, report_msg)
260 }
261}
262
263#[derive(Default)]
264pub struct ThreadRuntime {}
265
266impl BronzeRuntime for ThreadRuntime {
267 fn run(&self, _: impl Runnable, _: bool) {
268 panic!("Not supported in `ThreadRuntime`, please use `run_safe`")
269 }
270
271 #[inline(always)]
272 fn run_safe<F>(&self, runnable: F, _: bool)
273 where
274 F: Runnable + Send + Sync + 'static,
275 {
276 let handle = thread::spawn(move || {
277 runnable.run();
278 });
279 handle.join().unwrap();
280 }
281}
282
283#[derive(Builder, Debug, Clone)]
284#[builder(setter(into))]
285pub struct RunnableMetadata {
286 #[allow(dead_code)]
287 pub(crate) id: Option<u64>,
288 #[allow(dead_code)]
289 pub(crate) name: Option<String>,
290 #[allow(dead_code)]
291 pub(crate) maximum_run_times: Option<u64>,
292 #[allow(dead_code)]
293 pub(crate) maximum_parallelism: Option<u32>,
294 #[allow(dead_code)]
295 pub(crate) schedule: Option<ScheduleTimeHolder>,
296}
297
298impl Default for RunnableMetadata {
299 fn default() -> Self {
300 RunnableMetadataBuilder::default()
301 .id(None)
302 .name(None)
303 .maximum_run_times(None)
304 .maximum_parallelism(None)
305 .schedule(None)
306 .build()
307 .unwrap()
308 }
309}
310
311impl RunnableMetadata {
312 pub fn set_id(&mut self, id: u64) -> &mut Self {
313 self.id = Some(id);
314 self
315 }
316
317 pub fn set_name(&mut self, name: String) -> &mut Self {
318 self.name = Some(name);
319 self
320 }
321
322 pub fn set_maximum_run_times(&mut self, maximum_run_times: u64) -> &mut Self {
323 self.maximum_run_times = Some(maximum_run_times);
324 self
325 }
326
327 pub fn set_maximum_parallelism(&mut self, maximum_parallelism: u32) -> &mut Self {
328 self.maximum_parallelism = Some(maximum_parallelism);
329 self
330 }
331
332 pub fn set_schedule(&mut self, schedule: ScheduleTimeHolder) -> &mut Self {
333 self.schedule = Some(schedule);
334 self
335 }
336}
337
338impl From<&str> for RunnableMetadata {
339 fn from(value: &str) -> Self {
340 let mut m = RunnableMetadata::default();
341 m.set_name(value.to_string());
342 m
343 }
344}
345
346pub type SafeMetadata = Arc<Mutex<RunnableMetadata>>;
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use bronzeflow_utils::{debug, info};
352
353 #[cfg(feature = "async")]
354 #[test]
355 fn create_async_fn() {
356 let _ = AsyncFn(|| async { info!("I am an async function") });
357 let _ = AsyncFn(|| async { 0 });
358 }
359
360 #[cfg(feature = "async")]
361 #[test]
362 #[allow(unused_assignments)]
363 fn create_async_fn_mut() {
364 let t = String::from("test");
365 let _ = AsyncFn(move || {
366 let mut nt = t.clone();
367 println!("{}", nt);
368 async move {
369 nt = String::from("new string");
370 println!("Nt: {:?}", nt);
371 nt.push_str("hx");
372 let _ = nt;
373 }
374 });
375 }
376
377 #[cfg(feature = "async")]
378 #[test]
379 #[allow(unused_assignments)]
380 fn async_fn_from_closure() {
381 let _ = AsyncFn::from(|| async {
382 info!("new function");
383 });
384 let t = String::new();
385 let _ = AsyncFn::from(move || {
386 let mut t = t.clone();
387 async move {
388 t = String::new();
389 info!("{}", t);
390 }
391 });
392 }
393
394 #[cfg(feature = "async_tokio")]
395 #[tokio::test]
396 async fn async_fn_with_tokio() {
397 let f = AsyncFn(|| async { info!("I am an async function") });
398 check_run_result(f.run(), true, true);
399 check_run_result(f.run_async(), true, true);
400 }
401
402 #[cfg(feature = "async")]
403 #[cfg(not(feature = "async_tokio"))]
404 #[test]
405 fn async_fn_without_tokio() {
406 let f = AsyncFn(|| async { info!("I am an async function") });
407 check_run_result(f.run(), true, false);
408 check_run_result(f.run_async(), true, false);
409 }
410
411 #[test]
412 fn create_sync_fn() {
413 let _ = SyncFn(|| info!("I am a sync function"));
414 let _ = SyncFn(|| {
415 info!("This function could not return data");
416 });
417 let _ = SyncFn::from(|| info!("I am a sync function"));
418 }
419
420 #[test]
421 fn sync_fn_run() {
422 let f = SyncFn(|| info!("I am a sync function"));
423 check_run_result(f.run(), false, false);
424 check_run_result(f.run_async(), false, false);
425 }
426
427 fn check_run_result<T>(handle: RuntimeJoinHandle<T>, is_async: bool, is_tokio: bool) {
428 match handle {
429 RuntimeJoinHandle::SyncJobHandle if !is_async => (),
430 #[cfg(feature = "async_tokio")]
431 RuntimeJoinHandle::AsyncTokioJoinHandle(_) if is_async && is_tokio => (),
432
433 #[cfg(feature = "async")]
434 RuntimeJoinHandle::FutureBlockJoinHandle(_) if is_async && !is_tokio => (),
435 _ => panic!("Run sync function failed"),
436 }
437 }
438
439 fn test_basic_runnable<T, F>(
440 runnable: impl Runnable<Handle = T>,
441 is_async: bool,
442 is_tokio: bool,
443 validator: F,
444 ) where
445 F: Fn(T, bool, bool),
446 {
447 assert_eq!(is_async, runnable.is_async());
448 validator(runnable.run(), is_async, is_tokio);
449 validator(runnable.run_async(), is_async, is_tokio);
450 }
451
452 #[test]
453 fn run_runnable() {
454 let sync_fn = SyncFn(|| info!("I am a sync function"));
455 test_basic_runnable(sync_fn, false, false, check_run_result);
456 }
457
458 #[cfg(feature = "async")]
459 #[cfg(not(feature = "async_tokio"))]
460 #[test]
461 fn run_runnable_without_tokio() {
462 let f = AsyncFn(|| async { info!("I am an async function to run without tokio") });
463 test_basic_runnable(f, true, false, check_run_result);
464 }
465
466 #[cfg(feature = "async_tokio")]
467 #[tokio::test]
468 async fn run_runnable_with_tokio() {
469 let f = AsyncFn(|| async { info!("I am an async function run with tokio") });
470 test_basic_runnable(f, true, true, check_run_result);
471 }
472
473 #[test]
474 #[should_panic]
475 fn bronze_runtime_run_panic() {
476 let sync_fn = SyncFn(|| info!("I am a sync function"));
477 let rt = ThreadRuntime::default();
478 rt.run(sync_fn, false);
479 }
480
481 #[test]
482 fn bronze_runtime_run_safe() {
483 let sync_fn = SyncFn(|| info!("I am a sync function"));
484 let rt = ThreadRuntime::default();
485 rt.run_safe(sync_fn, false);
486 }
487
488 #[test]
489 fn custom_runnable() {
490 struct CustomRunnable {}
491 impl CustomRunnable {
492 pub fn new() -> Self {
493 CustomRunnable {}
494 }
495 }
496 impl Runnable for CustomRunnable {
497 fn run_async(&self) -> Self::Handle {
498 RuntimeJoinHandle::SyncJobHandle
499 }
500 }
501 test_basic_runnable(CustomRunnable::new(), false, false, check_run_result);
502
503 let s = SafeWrappedRunner(Arc::new(Mutex::new(WrappedRunner(Box::new(
504 CustomRunnable::new(),
505 )))));
506 test_basic_runnable(s, false, false, check_run_result);
507 }
508
509 #[test]
510 fn type_name_type_id() {
511 let r1 = SyncFn(|| println!("runnable"));
512 let name = r1.run_type_name();
513 let id = r1.run_type_id();
514 debug!("type name: {}, type id: {:?}", name, id);
515 }
516}