bronzeflow_core/runtime/
mod.rs

1// This is a part of bronze.
2
3//! Bronze's runtime, which is the abstraction of the basic scheduling task, and wraps the synchronous and asynchronous runtime environment
4//!
5//! You could implement your custom runtime
6//!
7//! **There are two abstraction:**
8//!
9//! 1. ```Runnable```: basic trait to be implemented that are scheduled to run, it maybe a function, a closure or a struct, etc.
10//! 2. ```BronzeRuntime```: the trait for runtime environment。
11//!
12// TODO Add more examples to use the runnable and runtime
13
14#[cfg(feature = "async_tokio")]
15pub mod tokio_runtime;
16
17use std::any::{type_name, TypeId};
18use std::fmt::Debug;
19use std::sync::{Arc, Mutex};
20use std::thread;
21
22#[allow(unused_imports)]
23#[cfg(feature = "async")]
24use futures::executor as executor_executor;
25
26use bronzeflow_time::schedule_time::ScheduleTimeHolder;
27#[cfg(feature = "async_tokio")]
28use tokio;
29#[cfg(feature = "async_tokio")]
30use tokio::spawn as tokio_spawn;
31
32#[derive(Debug)]
33pub enum RuntimeJoinHandle<T> {
34    SyncJobHandle,
35
36    #[cfg(feature = "async_tokio")]
37    AsyncTokioJoinHandle(tokio::task::JoinHandle<T>),
38
39    #[cfg(feature = "async")]
40    FutureBlockJoinHandle(T),
41
42    _Unreachable(std::convert::Infallible, std::marker::PhantomData<T>),
43}
44
45pub(crate) trait NotFnRunnable {}
46
47#[cfg(feature = "async")]
48#[derive(Debug, Clone)]
49pub struct AsyncFn<F: Fn() -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static>(
50    pub F,
51);
52
53#[derive(Debug, Clone)]
54pub struct SyncFn<F: Fn() + Send + 'static + Clone>(pub F);
55
56#[cfg(feature = "async")]
57impl<F: Fn() -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static> From<F>
58    for AsyncFn<F, U>
59{
60    fn from(value: F) -> Self {
61        AsyncFn(value)
62    }
63}
64
65// #[cfg(feature = "async")]
66// impl<F: Fn() -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static> !NotFnRunnable for F{}
67
68impl<F: Fn() + Send + 'static + Clone> From<F> for SyncFn<F> {
69    fn from(value: F) -> Self {
70        SyncFn(value)
71    }
72}
73
74// impl<F: Fn() + Send + 'static + Clone> !NotFnRunnable for F{}
75
76// pub trait RunnableType {
77//     fn get_type_name(&self) -> &str {
78//         type_name::<Self>()
79//     }
80// }
81
82pub trait Runnable: 'static {
83    type Handle = RuntimeJoinHandle<()>;
84
85    // TODO remove name
86    fn name(&self) -> String {
87        "test name in runnable".to_string()
88    }
89
90    // TODO remove name
91    fn set_name(&mut self, _: &str) {}
92
93    #[inline(always)]
94    fn run(&self) -> Self::Handle {
95        self.run_async()
96    }
97
98    fn run_async(&self) -> Self::Handle;
99
100    #[inline(always)]
101    fn is_async(&self) -> bool {
102        false
103    }
104
105    fn metadata(&self) -> Option<RunnableMetadata> {
106        None
107    }
108
109    #[inline(always)]
110    fn run_type_name(&self) -> String {
111        type_name::<Self>().to_string()
112    }
113
114    #[inline(always)]
115    fn run_type_id(&self) -> TypeId {
116        TypeId::of::<Self>()
117    }
118    //
119    // #[inline(always)]
120    // fn to_safe_wrapper(self) -> SafeWrappedRunner {
121    //     SafeWrappedRunner(Arc::new(Mutex::new(
122    //         WrappedRunner(Box::new(self))
123    //     )))
124    // }
125}
126
127// TODO delete this
128pub trait BuildFromRunnable {
129    type Type;
130    fn build_from(
131        runnable: impl Runnable<Handle = RuntimeJoinHandle<()>> + Send + 'static,
132    ) -> Self::Type;
133}
134
135pub fn run_async<F, U>(runnable: &F) -> RuntimeJoinHandle<()>
136where
137    F: Fn() -> U + Send + Clone + 'static,
138    U: std::future::Future + Send + 'static,
139{
140    #[allow(unused_variables)]
141    let f = runnable();
142    cfg_if::cfg_if! {
143        if #[cfg(feature = "async_tokio")] {
144            let handle = tokio_spawn({
145                async {
146                    f.await;
147                }
148            });
149            RuntimeJoinHandle::AsyncTokioJoinHandle(handle)
150        } else if #[cfg(feature = "async")] {
151            // if not tokio, use `block_on`
152            // just for dev
153            let _output = executor_executor::block_on({
154                async {
155                    f.await
156                }
157            });
158            // TODO return data of real type
159            RuntimeJoinHandle::FutureBlockJoinHandle(())
160        } else {
161            panic!("Not support run async");
162        }
163    }
164}
165
166#[cfg(feature = "async")]
167impl<F: Fn() -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static> Runnable
168    for AsyncFn<F, U>
169{
170    type Handle = RuntimeJoinHandle<()>;
171
172    #[inline(always)]
173    fn run_async(&self) -> Self::Handle {
174        run_async(&self.0)
175    }
176
177    #[inline(always)]
178    fn is_async(&self) -> bool {
179        true
180    }
181}
182// Can`t do this, see: https://stackoverflow.com/questions/73782573/why-do-blanket-implementations-for-two-different-traits-conflict
183// #[cfg(feature = "async")]
184// impl<F: Fn(i32) -> U + Send + Clone + 'static, U: std::future::Future + Send + 'static> Runnable for F {
185//     fn run_async(&self) -> Self::Handle {
186//         run_async(&self)
187//     }
188// }
189
190impl<F: Fn() + Send + 'static + Clone> Runnable for SyncFn<F> {
191    type Handle = RuntimeJoinHandle<()>;
192
193    #[inline(always)]
194    fn run_async(&self) -> Self::Handle {
195        self.0();
196        RuntimeJoinHandle::SyncJobHandle
197    }
198}
199
200impl<F: Fn() + Send + 'static + Clone> Runnable for F {
201    fn run_async(&self) -> Self::Handle {
202        self();
203        RuntimeJoinHandle::SyncJobHandle
204    }
205}
206
207pub type RunnerType = Box<dyn Runnable<Handle = RuntimeJoinHandle<()>> + 'static + Send>;
208
209pub struct WrappedRunner(pub RunnerType);
210
211#[derive(Clone)]
212pub struct SafeWrappedRunner(pub(crate) Arc<Mutex<WrappedRunner>>);
213
214impl Runnable for WrappedRunner {
215    type Handle = RuntimeJoinHandle<()>;
216
217    #[inline(always)]
218    fn run_async(&self) -> Self::Handle {
219        self.0.run_async()
220    }
221
222    #[inline(always)]
223    fn run_type_name(&self) -> String {
224        self.0.run_type_name()
225    }
226
227    #[inline(always)]
228    fn run_type_id(&self) -> TypeId {
229        self.0.run_type_id()
230    }
231}
232
233impl Runnable for SafeWrappedRunner {
234    type Handle = RuntimeJoinHandle<()>;
235
236    #[inline(always)]
237    fn run_async(&self) -> Self::Handle {
238        self.0.lock().unwrap().0.run_async()
239    }
240
241    #[inline(always)]
242    fn run_type_name(&self) -> String {
243        // self.0.as_ref().lock().unwrap().type_name()
244        self.0.lock().unwrap().0.run_type_name()
245    }
246    #[inline(always)]
247    fn run_type_id(&self) -> TypeId {
248        self.0.lock().unwrap().0.run_type_id()
249    }
250}
251
252pub trait BronzeRuntime {
253    fn run(&self, runnable: impl Runnable, report_msg: bool);
254
255    fn run_safe<F>(&self, runnable: F, report_msg: bool)
256    where
257        F: Runnable + Send + Sync + 'static,
258    {
259        self.run(runnable, report_msg)
260    }
261}
262
263#[derive(Default)]
264pub struct ThreadRuntime {}
265
266impl BronzeRuntime for ThreadRuntime {
267    fn run(&self, _: impl Runnable, _: bool) {
268        panic!("Not supported in `ThreadRuntime`, please use `run_safe`")
269    }
270
271    #[inline(always)]
272    fn run_safe<F>(&self, runnable: F, _: bool)
273    where
274        F: Runnable + Send + Sync + 'static,
275    {
276        let handle = thread::spawn(move || {
277            runnable.run();
278        });
279        handle.join().unwrap();
280    }
281}
282
283#[derive(Builder, Debug, Clone)]
284#[builder(setter(into))]
285pub struct RunnableMetadata {
286    #[allow(dead_code)]
287    pub(crate) id: Option<u64>,
288    #[allow(dead_code)]
289    pub(crate) name: Option<String>,
290    #[allow(dead_code)]
291    pub(crate) maximum_run_times: Option<u64>,
292    #[allow(dead_code)]
293    pub(crate) maximum_parallelism: Option<u32>,
294    #[allow(dead_code)]
295    pub(crate) schedule: Option<ScheduleTimeHolder>,
296}
297
298impl Default for RunnableMetadata {
299    fn default() -> Self {
300        RunnableMetadataBuilder::default()
301            .id(None)
302            .name(None)
303            .maximum_run_times(None)
304            .maximum_parallelism(None)
305            .schedule(None)
306            .build()
307            .unwrap()
308    }
309}
310
311impl RunnableMetadata {
312    pub fn set_id(&mut self, id: u64) -> &mut Self {
313        self.id = Some(id);
314        self
315    }
316
317    pub fn set_name(&mut self, name: String) -> &mut Self {
318        self.name = Some(name);
319        self
320    }
321
322    pub fn set_maximum_run_times(&mut self, maximum_run_times: u64) -> &mut Self {
323        self.maximum_run_times = Some(maximum_run_times);
324        self
325    }
326
327    pub fn set_maximum_parallelism(&mut self, maximum_parallelism: u32) -> &mut Self {
328        self.maximum_parallelism = Some(maximum_parallelism);
329        self
330    }
331
332    pub fn set_schedule(&mut self, schedule: ScheduleTimeHolder) -> &mut Self {
333        self.schedule = Some(schedule);
334        self
335    }
336}
337
338impl From<&str> for RunnableMetadata {
339    fn from(value: &str) -> Self {
340        let mut m = RunnableMetadata::default();
341        m.set_name(value.to_string());
342        m
343    }
344}
345
346pub type SafeMetadata = Arc<Mutex<RunnableMetadata>>;
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use bronzeflow_utils::{debug, info};
352
353    #[cfg(feature = "async")]
354    #[test]
355    fn create_async_fn() {
356        let _ = AsyncFn(|| async { info!("I am an async function") });
357        let _ = AsyncFn(|| async { 0 });
358    }
359
360    #[cfg(feature = "async")]
361    #[test]
362    #[allow(unused_assignments)]
363    fn create_async_fn_mut() {
364        let t = String::from("test");
365        let _ = AsyncFn(move || {
366            let mut nt = t.clone();
367            println!("{}", nt);
368            async move {
369                nt = String::from("new string");
370                println!("Nt: {:?}", nt);
371                nt.push_str("hx");
372                let _ = nt;
373            }
374        });
375    }
376
377    #[cfg(feature = "async")]
378    #[test]
379    #[allow(unused_assignments)]
380    fn async_fn_from_closure() {
381        let _ = AsyncFn::from(|| async {
382            info!("new function");
383        });
384        let t = String::new();
385        let _ = AsyncFn::from(move || {
386            let mut t = t.clone();
387            async move {
388                t = String::new();
389                info!("{}", t);
390            }
391        });
392    }
393
394    #[cfg(feature = "async_tokio")]
395    #[tokio::test]
396    async fn async_fn_with_tokio() {
397        let f = AsyncFn(|| async { info!("I am an async function") });
398        check_run_result(f.run(), true, true);
399        check_run_result(f.run_async(), true, true);
400    }
401
402    #[cfg(feature = "async")]
403    #[cfg(not(feature = "async_tokio"))]
404    #[test]
405    fn async_fn_without_tokio() {
406        let f = AsyncFn(|| async { info!("I am an async function") });
407        check_run_result(f.run(), true, false);
408        check_run_result(f.run_async(), true, false);
409    }
410
411    #[test]
412    fn create_sync_fn() {
413        let _ = SyncFn(|| info!("I am a sync function"));
414        let _ = SyncFn(|| {
415            info!("This function could not return data");
416        });
417        let _ = SyncFn::from(|| info!("I am a sync function"));
418    }
419
420    #[test]
421    fn sync_fn_run() {
422        let f = SyncFn(|| info!("I am a sync function"));
423        check_run_result(f.run(), false, false);
424        check_run_result(f.run_async(), false, false);
425    }
426
427    fn check_run_result<T>(handle: RuntimeJoinHandle<T>, is_async: bool, is_tokio: bool) {
428        match handle {
429            RuntimeJoinHandle::SyncJobHandle if !is_async => (),
430            #[cfg(feature = "async_tokio")]
431            RuntimeJoinHandle::AsyncTokioJoinHandle(_) if is_async && is_tokio => (),
432
433            #[cfg(feature = "async")]
434            RuntimeJoinHandle::FutureBlockJoinHandle(_) if is_async && !is_tokio => (),
435            _ => panic!("Run sync function failed"),
436        }
437    }
438
439    fn test_basic_runnable<T, F>(
440        runnable: impl Runnable<Handle = T>,
441        is_async: bool,
442        is_tokio: bool,
443        validator: F,
444    ) where
445        F: Fn(T, bool, bool),
446    {
447        assert_eq!(is_async, runnable.is_async());
448        validator(runnable.run(), is_async, is_tokio);
449        validator(runnable.run_async(), is_async, is_tokio);
450    }
451
452    #[test]
453    fn run_runnable() {
454        let sync_fn = SyncFn(|| info!("I am a sync function"));
455        test_basic_runnable(sync_fn, false, false, check_run_result);
456    }
457
458    #[cfg(feature = "async")]
459    #[cfg(not(feature = "async_tokio"))]
460    #[test]
461    fn run_runnable_without_tokio() {
462        let f = AsyncFn(|| async { info!("I am an async function to run without tokio") });
463        test_basic_runnable(f, true, false, check_run_result);
464    }
465
466    #[cfg(feature = "async_tokio")]
467    #[tokio::test]
468    async fn run_runnable_with_tokio() {
469        let f = AsyncFn(|| async { info!("I am an async function run with tokio") });
470        test_basic_runnable(f, true, true, check_run_result);
471    }
472
473    #[test]
474    #[should_panic]
475    fn bronze_runtime_run_panic() {
476        let sync_fn = SyncFn(|| info!("I am a sync function"));
477        let rt = ThreadRuntime::default();
478        rt.run(sync_fn, false);
479    }
480
481    #[test]
482    fn bronze_runtime_run_safe() {
483        let sync_fn = SyncFn(|| info!("I am a sync function"));
484        let rt = ThreadRuntime::default();
485        rt.run_safe(sync_fn, false);
486    }
487
488    #[test]
489    fn custom_runnable() {
490        struct CustomRunnable {}
491        impl CustomRunnable {
492            pub fn new() -> Self {
493                CustomRunnable {}
494            }
495        }
496        impl Runnable for CustomRunnable {
497            fn run_async(&self) -> Self::Handle {
498                RuntimeJoinHandle::SyncJobHandle
499            }
500        }
501        test_basic_runnable(CustomRunnable::new(), false, false, check_run_result);
502
503        let s = SafeWrappedRunner(Arc::new(Mutex::new(WrappedRunner(Box::new(
504            CustomRunnable::new(),
505        )))));
506        test_basic_runnable(s, false, false, check_run_result);
507    }
508
509    #[test]
510    fn type_name_type_id() {
511        let r1 = SyncFn(|| println!("runnable"));
512        let name = r1.run_type_name();
513        let id = r1.run_type_id();
514        debug!("type name: {}, type id: {:?}", name, id);
515    }
516}