bronzeflow_core/session/
mod.rs

1use crate::executor::DefaultExecutor;
2use crate::manager::ScheduleManager;
3use crate::prelude::{Executor, ThreadTrigger, Trigger, DAG};
4use crate::service::Service;
5use crate::store::{MemoryStorage, Storage};
6use crate::task::RunnableHolder;
7use bronzeflow_time::prelude::ScheduleExpr;
8use bronzeflow_utils::{debug, BronzeError, Result};
9use std::fmt::Debug;
10
11pub trait Session: Service {
12    fn submit<D, S>(&mut self, s: S, try_into_dag: D) -> Result<()>
13    where
14        D: Into<DAG>,
15        S: TryInto<ScheduleExpr>,
16        BronzeError: From<S::Error>,
17    {
18        let schedule_expr = s.try_into()?;
19        let mut dag = try_into_dag.into();
20        dag.set_schedule(schedule_expr);
21        dag.prepare();
22
23        let runnable = if dag.cal_task_nums() == 1 {
24            debug!("Dag has one task, transform it to single task");
25            let task = dag.to_single_task()?;
26            RunnableHolder::Task(task)
27        } else {
28            RunnableHolder::Dag(dag)
29        };
30        self.submit_runnable(runnable)
31    }
32
33    fn submit_runnable(&mut self, runnable: RunnableHolder) -> Result<()>;
34
35    fn build_session(&mut self) -> Result<()>;
36}
37
38pub struct LocalSession<SG: Storage + 'static, TG: Trigger + 'static, E: Executor + 'static> {
39    manager: Option<ScheduleManager<SG, TG, E>>,
40    trigger: Option<TG>,
41    storage: Option<SG>,
42    executor: Option<E>,
43}
44
45impl<SG: Storage, TG: Trigger, E: Executor> LocalSession<SG, TG, E> {
46    pub fn new(storage: Option<SG>, trigger: Option<TG>, executor: Option<E>) -> Self {
47        LocalSession {
48            manager: None,
49            trigger,
50            storage,
51            executor,
52        }
53    }
54}
55
56impl<SG: Storage, TG: Trigger, E: Executor> Service for LocalSession<SG, TG, E> {
57    fn start(&mut self) {
58        self.manager.as_mut().unwrap().start();
59    }
60
61    fn stop(&mut self) {
62        self.manager.as_mut().unwrap().stop();
63    }
64}
65
66impl<SG: Storage, TG: Trigger, E: Executor> Drop for LocalSession<SG, TG, E> {
67    fn drop(&mut self) {
68        if let Some(ref mut m) = self.manager {
69            m.stop();
70        }
71    }
72}
73
74impl<SG: Storage, TG: Trigger, E: Executor> Session for LocalSession<SG, TG, E> {
75    fn submit_runnable(&mut self, runnable: RunnableHolder) -> Result<()> {
76        self.manager.as_mut().unwrap().add_runnable(runnable);
77        Ok(())
78    }
79
80    fn build_session(&mut self) -> Result<()> {
81        let storage = self
82            .storage
83            .take()
84            .ok_or_else(|| BronzeError::msg("Please set storage first"))?;
85        let trigger = self
86            .trigger
87            .take()
88            .ok_or_else(|| BronzeError::msg("Please set trigger first"))?;
89        let executor = self
90            .executor
91            .take()
92            .ok_or_else(|| BronzeError::msg("Please set executor first"))?;
93        self.manager = Some(ScheduleManager::new(storage, trigger, executor));
94        Ok(())
95    }
96}
97
98#[derive(Debug, Clone)]
99pub struct SessionBuilder<
100    SG: Storage = MemoryStorage,
101    TG: Trigger = ThreadTrigger,
102    E: Executor = DefaultExecutor,
103    S: Session = LocalSession<SG, TG, E>,
104> {
105    /// Scheduler backend address
106    uri: Option<String>,
107
108    /// Schedule trigger
109    trigger: Option<TG>,
110
111    /// The storage of DAG
112    storage: Option<SG>,
113
114    executor: Option<E>,
115
116    /// Schedule session
117    #[allow(dead_code)]
118    session: Option<S>,
119}
120
121impl SessionBuilder {
122    pub fn set_uri(mut self, uri: &str) -> Self {
123        self.uri = Some(uri.to_string());
124        self
125    }
126}
127
128impl<SG: Storage, TG: Trigger, E: Executor, S: Session> SessionBuilder<SG, TG, E, S> {
129    pub fn trigger(mut self, trigger: TG) -> Self {
130        self.trigger = Some(trigger);
131        self
132    }
133
134    pub fn storage(mut self, storage: SG) -> Self {
135        self.storage = Some(storage);
136        self
137    }
138
139    pub fn executor(mut self, executor: E) -> Self {
140        self.executor = Some(executor);
141        self
142    }
143
144    pub fn get(&mut self) -> Result<(SG, TG, E)> {
145        let storage = self
146            .storage
147            .take()
148            .ok_or_else(|| BronzeError::msg("Please set storage first"))?;
149        let trigger = self
150            .trigger
151            .take()
152            .ok_or_else(|| BronzeError::msg("Please set trigger first"))?;
153        let executor = self
154            .executor
155            .take()
156            .ok_or_else(|| BronzeError::msg("Please set executor first"))?;
157        Ok((storage, trigger, executor))
158    }
159}
160
161impl<SG: Storage, TG: Trigger, E: Executor> SessionBuilder<SG, TG, E, LocalSession<SG, TG, E>> {
162    pub fn build(mut self) -> Result<LocalSession<SG, TG, E>> {
163        let (storage, trigger, executor) = self.get()?;
164        let mut session = LocalSession::new(Some(storage), Some(trigger), Some(executor));
165        let _ = &session.build_session()?;
166        session.start();
167        Ok(session)
168    }
169}
170
171pub trait LocalSessionFactory {
172    fn local() -> Self;
173}
174
175pub trait DefaultSessionFactory {
176    fn default() -> Self;
177}
178
179impl DefaultSessionFactory
180    for SessionBuilder<
181        MemoryStorage,
182        ThreadTrigger,
183        DefaultExecutor,
184        LocalSession<MemoryStorage, ThreadTrigger, DefaultExecutor>,
185    >
186{
187    fn default() -> Self {
188        let storage = Some(MemoryStorage::new());
189        let trigger = Some(ThreadTrigger::new());
190        let executor = Some(DefaultExecutor::default());
191        SessionBuilder {
192            uri: None,
193            storage,
194            trigger,
195            executor,
196            session: Some(LocalSession::new(
197                Some(MemoryStorage::new()),
198                Some(ThreadTrigger::new()),
199                Some(DefaultExecutor::default()),
200            )),
201        }
202    }
203}
204
205impl<SG: Storage, TG: Trigger, E: Executor> LocalSessionFactory
206    for SessionBuilder<SG, TG, E, LocalSession<SG, TG, E>>
207{
208    fn local() -> Self {
209        SessionBuilder {
210            uri: None,
211            storage: None,
212            trigger: None,
213            executor: None,
214            session: None,
215        }
216    }
217}
218
219pub trait RemoteSessionFactory {
220    fn remote() -> Self;
221}
222
223pub struct RemoteSession<SG: Storage + 'static, TG: Trigger + 'static, E: Executor + 'static> {
224    manager: Option<ScheduleManager<SG, TG, E>>,
225    #[allow(dead_code)]
226    trigger: Option<TG>,
227    #[allow(dead_code)]
228    storage: Option<SG>,
229    #[allow(dead_code)]
230    executor: Option<E>,
231}
232
233impl<SG: Storage, TG: Trigger, E: Executor> RemoteSession<SG, TG, E> {
234    pub fn new(storage: Option<SG>, trigger: Option<TG>, executor: Option<E>) -> Self {
235        RemoteSession {
236            manager: None,
237            trigger,
238            storage,
239            executor,
240        }
241    }
242}
243
244impl<SG: Storage, TG: Trigger, E: Executor> Service for RemoteSession<SG, TG, E> {
245    fn start(&mut self) {
246        todo!()
247    }
248
249    fn stop(&mut self) {
250        todo!()
251    }
252}
253
254impl<SG: Storage, TG: Trigger, E: Executor> Drop for RemoteSession<SG, TG, E> {
255    fn drop(&mut self) {
256        if let Some(ref mut m) = self.manager {
257            m.stop();
258        }
259    }
260}
261
262impl<SG: Storage, TG: Trigger, E: Executor> Session for RemoteSession<SG, TG, E> {
263    fn submit_runnable(&mut self, _: RunnableHolder) -> Result<()> {
264        todo!()
265    }
266
267    fn build_session(&mut self) -> Result<()> {
268        // let storage = self
269        //     .storage
270        //     .take()
271        //     .ok_or(BronzeError::msg("Please set storage first"))?;
272        // let trigger = self
273        //     .trigger
274        //     .take()
275        //     .ok_or(BronzeError::msg("Please set trigger first"))?;
276        // let executor = self
277        //     .executor
278        //     .take()
279        //     .ok_or(BronzeError::msg("Please set executor first"))?;
280        Ok(())
281    }
282}
283
284impl<SG: Storage, TG: Trigger, E: Executor> SessionBuilder<SG, TG, E, RemoteSession<SG, TG, E>> {
285    pub fn build(mut self) -> Result<RemoteSession<SG, TG, E>> {
286        let (storage, trigger, executor) = self.get()?;
287        let mut session = RemoteSession::new(Some(storage), Some(trigger), Some(executor));
288        session.build_session()?;
289        Ok(session)
290    }
291}
292
293impl<SG: Storage, TG: Trigger, E: Executor> RemoteSessionFactory
294    for SessionBuilder<SG, TG, E, RemoteSession<SG, TG, E>>
295{
296    fn remote() -> Self {
297        SessionBuilder {
298            uri: None,
299            storage: None,
300            trigger: None,
301            executor: None,
302            session: None,
303        }
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use crate::dag;
311    use crate::prelude::*;
312    use std::{thread, time};
313
314    fn get_dag() -> DAG {
315        dag!(
316            "P1A" => ||println!("P1-A") => dag!(
317                "P1A1" => ||println!("P1-A1")
318            )
319        )
320        .build()
321        .unwrap()
322    }
323
324    fn test_local_session(
325        d: DAG,
326        trigger: impl Trigger + 'static,
327        storage: impl Storage + 'static,
328        executor: impl Executor + 'static,
329    ) {
330        let mut s = SessionBuilder::local()
331            .trigger(trigger)
332            .storage(storage)
333            .executor(executor)
334            .build()
335            .unwrap();
336        s.submit("1/10 * * * * *", d).unwrap();
337        thread::sleep(time::Duration::from_secs(1));
338    }
339
340    #[test]
341    fn create_default_local_session() {
342        let mut s = SessionBuilder::default().build().unwrap();
343        let d = get_dag();
344        s.submit("1/10 * * * * *", d).unwrap();
345        thread::sleep(time::Duration::from_secs(1));
346    }
347
348    #[test]
349    fn create_local_session() {
350        // let tokio_rt = Arc::new(TokioRuntime::new());
351        let mut s = SessionBuilder::local()
352            .trigger(ThreadTrigger::new())
353            .storage(MemoryStorage::new())
354            .executor(DefaultExecutor::default())
355            .build()
356            .unwrap();
357        let d = get_dag();
358        s.submit("1/10 * * * * *", d).unwrap();
359        thread::sleep(time::Duration::from_secs(1));
360    }
361
362    #[cfg(feature = "async_tokio")]
363    #[tokio::test]
364    async fn create_local_session_with_tokio_runtime() {
365        use std::sync::Arc;
366        let tokio_rt = Arc::new(TokioRuntime::new());
367        let mut s = SessionBuilder::local()
368            .trigger(TokioTrigger::new(Arc::clone(&tokio_rt)))
369            .storage(MemoryStorage::new())
370            .executor(TokioExecutor::new(tokio_rt))
371            .build()
372            .unwrap();
373
374        let d = get_dag();
375        let f = || async {
376            println!("Im async function");
377        };
378        s.submit("1/1 * * * * *", d).unwrap();
379        s.submit("1/2 * * * * *", AsyncFn(f)).unwrap();
380        tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
381    }
382
383    #[test]
384    fn create_and_run() {
385        let d = get_dag();
386        test_local_session(
387            d,
388            ThreadTrigger::new(),
389            MemoryStorage::new(),
390            DefaultExecutor::default(),
391        );
392    }
393
394    #[test]
395    fn submit_sync_fn() {
396        let mut s = SessionBuilder::default().build().unwrap();
397        s.submit("1/1 * * * * *", || println!("I am sync function"))
398            .unwrap();
399        thread::sleep(time::Duration::from_secs(2));
400    }
401
402    #[cfg(feature = "async_tokio")]
403    #[tokio::test]
404    async fn run_async_unction() {
405        use std::sync::Arc;
406        let tokio_rt = Arc::new(TokioRuntime::new());
407        let mut s = SessionBuilder::local()
408            .trigger(TokioTrigger::new(Arc::clone(&tokio_rt)))
409            .storage(MemoryStorage::new())
410            .executor(TokioExecutor::new(tokio_rt))
411            .build()
412            .unwrap();
413        s.submit(
414            "1/1 * * * * *",
415            AsyncFn(|| async { println!("I am asynchronous task") }),
416        )
417        .unwrap();
418    }
419}