cronback_scheduler_srv/
lib.rs1pub(crate) mod db_model;
2pub(crate) mod error;
3pub(crate) mod handler;
4pub(crate) mod spinner;
5pub(crate) mod trigger_store;
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use handler::SchedulerAPIHandler;
11use lib::database::Database;
12use lib::grpc_client_provider::GrpcClientProvider;
13use lib::{netutils, service};
14use proto::scheduler_proto::scheduler_server::SchedulerServer;
15
16use crate::spinner::controller::SpinnerController;
17use crate::trigger_store::SqlTriggerStore;
18
19#[tracing::instrument(skip_all, fields(service = context.service_name()))]
20pub async fn start_scheduler_server(
21 mut context: service::ServiceContext,
22) -> anyhow::Result<()> {
23 let config = context.load_config();
24
25 let db = Database::connect(&config.scheduler.database_uri).await?;
26 db.migrate().await?;
27 let trigger_store = SqlTriggerStore::new(db);
28
29 let dispatcher_clients = Arc::new(GrpcClientProvider::new(context.clone()));
30
31 let controller = Arc::new(SpinnerController::new(
32 context.clone(),
33 Box::new(trigger_store),
34 dispatcher_clients,
35 ));
36
37 let addr =
38 netutils::parse_addr(&config.scheduler.address, config.scheduler.port)
39 .unwrap();
40 controller.start().await?;
41
42 let async_es = controller.clone();
43 let db_flush_s = config.scheduler.db_flush_s;
44 tokio::spawn(async move {
45 let sleep = Duration::from_secs(db_flush_s);
46 loop {
47 tokio::time::sleep(sleep).await;
48 async_es.perform_checkpoint().await;
49 }
50 });
51
52 let handler = SchedulerAPIHandler::new(context.clone(), controller.clone());
53 let svc = SchedulerServer::new(handler);
54
55 service::grpc_serve_tcp(
57 &mut context,
58 addr,
59 svc,
60 config.scheduler.request_processing_timeout_s,
61 )
62 .await;
63
64 controller.shutdown().await;
65 Ok(())
66}
67
68pub mod test_helpers {
69 use std::sync::Arc;
70
71 use lib::clients::scheduler_client::ScopedSchedulerClient;
72 use lib::database::Database;
73 use lib::grpc_client_provider::test_helpers::TestGrpcClientProvider;
74 use lib::grpc_client_provider::GrpcClientProvider;
75 use lib::service::{self, ServiceContext};
76 use proto::scheduler_proto::scheduler_server::SchedulerServer;
77 use tempfile::NamedTempFile;
78 use tokio::task::JoinHandle;
79
80 use crate::handler::SchedulerAPIHandler;
81 use crate::spinner::controller::SpinnerController;
82 use crate::trigger_store::SqlTriggerStore;
83
84 pub async fn test_server_and_client(
85 mut context: ServiceContext,
86 ) -> (
87 JoinHandle<()>,
88 TestGrpcClientProvider<ScopedSchedulerClient>,
89 ) {
90 let socket = NamedTempFile::new().unwrap();
91 let socket = Arc::new(socket.into_temp_path());
92 std::fs::remove_file(&*socket).unwrap();
93
94 let dispatcher_client_provider =
95 Arc::new(GrpcClientProvider::new(context.clone()));
96
97 let db = Database::in_memory().await.unwrap();
98 let trigger_store = SqlTriggerStore::new(db);
99 let controller = Arc::new(SpinnerController::new(
100 context.clone(),
101 Box::new(trigger_store),
102 dispatcher_client_provider,
103 ));
104 controller.start().await.unwrap();
105
106 let handler =
107 SchedulerAPIHandler::new(context.clone(), controller.clone());
108 let svc = SchedulerServer::new(handler);
109
110 let cloned_socket = Arc::clone(&socket);
111
112 let serve_future = tokio::spawn(async move {
113 let request_processing_timeout_s = 3;
114 service::grpc_serve_unix(
115 &mut context,
116 &*cloned_socket,
117 svc,
118 request_processing_timeout_s,
119 )
120 .await;
121 });
122
123 tokio::time::sleep(std::time::Duration::from_millis(250)).await;
125
126 let client_provider = TestGrpcClientProvider::new_single_shard(socket);
127
128 (serve_future, client_provider)
129 }
130}