1use crate::executor::DefaultExecutor;
2use crate::manager::ScheduleManager;
3use crate::prelude::{Executor, ThreadTrigger, Trigger, DAG};
4use crate::service::Service;
5use crate::store::{MemoryStorage, Storage};
6use crate::task::RunnableHolder;
7use bronzeflow_time::prelude::ScheduleExpr;
8use bronzeflow_utils::{debug, BronzeError, Result};
9use std::fmt::Debug;
10
11pub trait Session: Service {
12 fn submit<D, S>(&mut self, s: S, try_into_dag: D) -> Result<()>
13 where
14 D: Into<DAG>,
15 S: TryInto<ScheduleExpr>,
16 BronzeError: From<S::Error>,
17 {
18 let schedule_expr = s.try_into()?;
19 let mut dag = try_into_dag.into();
20 dag.set_schedule(schedule_expr);
21 dag.prepare();
22
23 let runnable = if dag.cal_task_nums() == 1 {
24 debug!("Dag has one task, transform it to single task");
25 let task = dag.to_single_task()?;
26 RunnableHolder::Task(task)
27 } else {
28 RunnableHolder::Dag(dag)
29 };
30 self.submit_runnable(runnable)
31 }
32
33 fn submit_runnable(&mut self, runnable: RunnableHolder) -> Result<()>;
34
35 fn build_session(&mut self) -> Result<()>;
36}
37
38pub struct LocalSession<SG: Storage + 'static, TG: Trigger + 'static, E: Executor + 'static> {
39 manager: Option<ScheduleManager<SG, TG, E>>,
40 trigger: Option<TG>,
41 storage: Option<SG>,
42 executor: Option<E>,
43}
44
45impl<SG: Storage, TG: Trigger, E: Executor> LocalSession<SG, TG, E> {
46 pub fn new(storage: Option<SG>, trigger: Option<TG>, executor: Option<E>) -> Self {
47 LocalSession {
48 manager: None,
49 trigger,
50 storage,
51 executor,
52 }
53 }
54}
55
56impl<SG: Storage, TG: Trigger, E: Executor> Service for LocalSession<SG, TG, E> {
57 fn start(&mut self) {
58 self.manager.as_mut().unwrap().start();
59 }
60
61 fn stop(&mut self) {
62 self.manager.as_mut().unwrap().stop();
63 }
64}
65
66impl<SG: Storage, TG: Trigger, E: Executor> Drop for LocalSession<SG, TG, E> {
67 fn drop(&mut self) {
68 if let Some(ref mut m) = self.manager {
69 m.stop();
70 }
71 }
72}
73
74impl<SG: Storage, TG: Trigger, E: Executor> Session for LocalSession<SG, TG, E> {
75 fn submit_runnable(&mut self, runnable: RunnableHolder) -> Result<()> {
76 self.manager.as_mut().unwrap().add_runnable(runnable);
77 Ok(())
78 }
79
80 fn build_session(&mut self) -> Result<()> {
81 let storage = self
82 .storage
83 .take()
84 .ok_or_else(|| BronzeError::msg("Please set storage first"))?;
85 let trigger = self
86 .trigger
87 .take()
88 .ok_or_else(|| BronzeError::msg("Please set trigger first"))?;
89 let executor = self
90 .executor
91 .take()
92 .ok_or_else(|| BronzeError::msg("Please set executor first"))?;
93 self.manager = Some(ScheduleManager::new(storage, trigger, executor));
94 Ok(())
95 }
96}
97
98#[derive(Debug, Clone)]
99pub struct SessionBuilder<
100 SG: Storage = MemoryStorage,
101 TG: Trigger = ThreadTrigger,
102 E: Executor = DefaultExecutor,
103 S: Session = LocalSession<SG, TG, E>,
104> {
105 uri: Option<String>,
107
108 trigger: Option<TG>,
110
111 storage: Option<SG>,
113
114 executor: Option<E>,
115
116 #[allow(dead_code)]
118 session: Option<S>,
119}
120
121impl SessionBuilder {
122 pub fn set_uri(mut self, uri: &str) -> Self {
123 self.uri = Some(uri.to_string());
124 self
125 }
126}
127
128impl<SG: Storage, TG: Trigger, E: Executor, S: Session> SessionBuilder<SG, TG, E, S> {
129 pub fn trigger(mut self, trigger: TG) -> Self {
130 self.trigger = Some(trigger);
131 self
132 }
133
134 pub fn storage(mut self, storage: SG) -> Self {
135 self.storage = Some(storage);
136 self
137 }
138
139 pub fn executor(mut self, executor: E) -> Self {
140 self.executor = Some(executor);
141 self
142 }
143
144 pub fn get(&mut self) -> Result<(SG, TG, E)> {
145 let storage = self
146 .storage
147 .take()
148 .ok_or_else(|| BronzeError::msg("Please set storage first"))?;
149 let trigger = self
150 .trigger
151 .take()
152 .ok_or_else(|| BronzeError::msg("Please set trigger first"))?;
153 let executor = self
154 .executor
155 .take()
156 .ok_or_else(|| BronzeError::msg("Please set executor first"))?;
157 Ok((storage, trigger, executor))
158 }
159}
160
161impl<SG: Storage, TG: Trigger, E: Executor> SessionBuilder<SG, TG, E, LocalSession<SG, TG, E>> {
162 pub fn build(mut self) -> Result<LocalSession<SG, TG, E>> {
163 let (storage, trigger, executor) = self.get()?;
164 let mut session = LocalSession::new(Some(storage), Some(trigger), Some(executor));
165 let _ = &session.build_session()?;
166 session.start();
167 Ok(session)
168 }
169}
170
171pub trait LocalSessionFactory {
172 fn local() -> Self;
173}
174
175pub trait DefaultSessionFactory {
176 fn default() -> Self;
177}
178
179impl DefaultSessionFactory
180 for SessionBuilder<
181 MemoryStorage,
182 ThreadTrigger,
183 DefaultExecutor,
184 LocalSession<MemoryStorage, ThreadTrigger, DefaultExecutor>,
185 >
186{
187 fn default() -> Self {
188 let storage = Some(MemoryStorage::new());
189 let trigger = Some(ThreadTrigger::new());
190 let executor = Some(DefaultExecutor::default());
191 SessionBuilder {
192 uri: None,
193 storage,
194 trigger,
195 executor,
196 session: Some(LocalSession::new(
197 Some(MemoryStorage::new()),
198 Some(ThreadTrigger::new()),
199 Some(DefaultExecutor::default()),
200 )),
201 }
202 }
203}
204
205impl<SG: Storage, TG: Trigger, E: Executor> LocalSessionFactory
206 for SessionBuilder<SG, TG, E, LocalSession<SG, TG, E>>
207{
208 fn local() -> Self {
209 SessionBuilder {
210 uri: None,
211 storage: None,
212 trigger: None,
213 executor: None,
214 session: None,
215 }
216 }
217}
218
219pub trait RemoteSessionFactory {
220 fn remote() -> Self;
221}
222
223pub struct RemoteSession<SG: Storage + 'static, TG: Trigger + 'static, E: Executor + 'static> {
224 manager: Option<ScheduleManager<SG, TG, E>>,
225 #[allow(dead_code)]
226 trigger: Option<TG>,
227 #[allow(dead_code)]
228 storage: Option<SG>,
229 #[allow(dead_code)]
230 executor: Option<E>,
231}
232
233impl<SG: Storage, TG: Trigger, E: Executor> RemoteSession<SG, TG, E> {
234 pub fn new(storage: Option<SG>, trigger: Option<TG>, executor: Option<E>) -> Self {
235 RemoteSession {
236 manager: None,
237 trigger,
238 storage,
239 executor,
240 }
241 }
242}
243
244impl<SG: Storage, TG: Trigger, E: Executor> Service for RemoteSession<SG, TG, E> {
245 fn start(&mut self) {
246 todo!()
247 }
248
249 fn stop(&mut self) {
250 todo!()
251 }
252}
253
254impl<SG: Storage, TG: Trigger, E: Executor> Drop for RemoteSession<SG, TG, E> {
255 fn drop(&mut self) {
256 if let Some(ref mut m) = self.manager {
257 m.stop();
258 }
259 }
260}
261
262impl<SG: Storage, TG: Trigger, E: Executor> Session for RemoteSession<SG, TG, E> {
263 fn submit_runnable(&mut self, _: RunnableHolder) -> Result<()> {
264 todo!()
265 }
266
267 fn build_session(&mut self) -> Result<()> {
268 Ok(())
281 }
282}
283
284impl<SG: Storage, TG: Trigger, E: Executor> SessionBuilder<SG, TG, E, RemoteSession<SG, TG, E>> {
285 pub fn build(mut self) -> Result<RemoteSession<SG, TG, E>> {
286 let (storage, trigger, executor) = self.get()?;
287 let mut session = RemoteSession::new(Some(storage), Some(trigger), Some(executor));
288 session.build_session()?;
289 Ok(session)
290 }
291}
292
293impl<SG: Storage, TG: Trigger, E: Executor> RemoteSessionFactory
294 for SessionBuilder<SG, TG, E, RemoteSession<SG, TG, E>>
295{
296 fn remote() -> Self {
297 SessionBuilder {
298 uri: None,
299 storage: None,
300 trigger: None,
301 executor: None,
302 session: None,
303 }
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use crate::dag;
311 use crate::prelude::*;
312 use std::{thread, time};
313
314 fn get_dag() -> DAG {
315 dag!(
316 "P1A" => ||println!("P1-A") => dag!(
317 "P1A1" => ||println!("P1-A1")
318 )
319 )
320 .build()
321 .unwrap()
322 }
323
324 fn test_local_session(
325 d: DAG,
326 trigger: impl Trigger + 'static,
327 storage: impl Storage + 'static,
328 executor: impl Executor + 'static,
329 ) {
330 let mut s = SessionBuilder::local()
331 .trigger(trigger)
332 .storage(storage)
333 .executor(executor)
334 .build()
335 .unwrap();
336 s.submit("1/10 * * * * *", d).unwrap();
337 thread::sleep(time::Duration::from_secs(1));
338 }
339
340 #[test]
341 fn create_default_local_session() {
342 let mut s = SessionBuilder::default().build().unwrap();
343 let d = get_dag();
344 s.submit("1/10 * * * * *", d).unwrap();
345 thread::sleep(time::Duration::from_secs(1));
346 }
347
348 #[test]
349 fn create_local_session() {
350 let mut s = SessionBuilder::local()
352 .trigger(ThreadTrigger::new())
353 .storage(MemoryStorage::new())
354 .executor(DefaultExecutor::default())
355 .build()
356 .unwrap();
357 let d = get_dag();
358 s.submit("1/10 * * * * *", d).unwrap();
359 thread::sleep(time::Duration::from_secs(1));
360 }
361
362 #[cfg(feature = "async_tokio")]
363 #[tokio::test]
364 async fn create_local_session_with_tokio_runtime() {
365 use std::sync::Arc;
366 let tokio_rt = Arc::new(TokioRuntime::new());
367 let mut s = SessionBuilder::local()
368 .trigger(TokioTrigger::new(Arc::clone(&tokio_rt)))
369 .storage(MemoryStorage::new())
370 .executor(TokioExecutor::new(tokio_rt))
371 .build()
372 .unwrap();
373
374 let d = get_dag();
375 let f = || async {
376 println!("Im async function");
377 };
378 s.submit("1/1 * * * * *", d).unwrap();
379 s.submit("1/2 * * * * *", AsyncFn(f)).unwrap();
380 tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
381 }
382
383 #[test]
384 fn create_and_run() {
385 let d = get_dag();
386 test_local_session(
387 d,
388 ThreadTrigger::new(),
389 MemoryStorage::new(),
390 DefaultExecutor::default(),
391 );
392 }
393
394 #[test]
395 fn submit_sync_fn() {
396 let mut s = SessionBuilder::default().build().unwrap();
397 s.submit("1/1 * * * * *", || println!("I am sync function"))
398 .unwrap();
399 thread::sleep(time::Duration::from_secs(2));
400 }
401
402 #[cfg(feature = "async_tokio")]
403 #[tokio::test]
404 async fn run_async_unction() {
405 use std::sync::Arc;
406 let tokio_rt = Arc::new(TokioRuntime::new());
407 let mut s = SessionBuilder::local()
408 .trigger(TokioTrigger::new(Arc::clone(&tokio_rt)))
409 .storage(MemoryStorage::new())
410 .executor(TokioExecutor::new(tokio_rt))
411 .build()
412 .unwrap();
413 s.submit(
414 "1/1 * * * * *",
415 AsyncFn(|| async { println!("I am asynchronous task") }),
416 )
417 .unwrap();
418 }
419}