cronback_scheduler_srv/
lib.rs

1pub(crate) mod db_model;
2pub(crate) mod error;
3pub(crate) mod handler;
4pub(crate) mod spinner;
5pub(crate) mod trigger_store;
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use handler::SchedulerAPIHandler;
11use lib::database::Database;
12use lib::grpc_client_provider::GrpcClientProvider;
13use lib::{netutils, service};
14use proto::scheduler_proto::scheduler_server::SchedulerServer;
15
16use crate::spinner::controller::SpinnerController;
17use crate::trigger_store::SqlTriggerStore;
18
19#[tracing::instrument(skip_all, fields(service = context.service_name()))]
20pub async fn start_scheduler_server(
21    mut context: service::ServiceContext,
22) -> anyhow::Result<()> {
23    let config = context.load_config();
24
25    let db = Database::connect(&config.scheduler.database_uri).await?;
26    db.migrate().await?;
27    let trigger_store = SqlTriggerStore::new(db);
28
29    let dispatcher_clients = Arc::new(GrpcClientProvider::new(context.clone()));
30
31    let controller = Arc::new(SpinnerController::new(
32        context.clone(),
33        Box::new(trigger_store),
34        dispatcher_clients,
35    ));
36
37    let addr =
38        netutils::parse_addr(&config.scheduler.address, config.scheduler.port)
39            .unwrap();
40    controller.start().await?;
41
42    let async_es = controller.clone();
43    let db_flush_s = config.scheduler.db_flush_s;
44    tokio::spawn(async move {
45        let sleep = Duration::from_secs(db_flush_s);
46        loop {
47            tokio::time::sleep(sleep).await;
48            async_es.perform_checkpoint().await;
49        }
50    });
51
52    let handler = SchedulerAPIHandler::new(context.clone(), controller.clone());
53    let svc = SchedulerServer::new(handler);
54
55    // grpc server
56    service::grpc_serve_tcp(
57        &mut context,
58        addr,
59        svc,
60        config.scheduler.request_processing_timeout_s,
61    )
62    .await;
63
64    controller.shutdown().await;
65    Ok(())
66}
67
68pub mod test_helpers {
69    use std::sync::Arc;
70
71    use lib::clients::scheduler_client::ScopedSchedulerClient;
72    use lib::database::Database;
73    use lib::grpc_client_provider::test_helpers::TestGrpcClientProvider;
74    use lib::grpc_client_provider::GrpcClientProvider;
75    use lib::service::{self, ServiceContext};
76    use proto::scheduler_proto::scheduler_server::SchedulerServer;
77    use tempfile::NamedTempFile;
78    use tokio::task::JoinHandle;
79
80    use crate::handler::SchedulerAPIHandler;
81    use crate::spinner::controller::SpinnerController;
82    use crate::trigger_store::SqlTriggerStore;
83
84    pub async fn test_server_and_client(
85        mut context: ServiceContext,
86    ) -> (
87        JoinHandle<()>,
88        TestGrpcClientProvider<ScopedSchedulerClient>,
89    ) {
90        let socket = NamedTempFile::new().unwrap();
91        let socket = Arc::new(socket.into_temp_path());
92        std::fs::remove_file(&*socket).unwrap();
93
94        let dispatcher_client_provider =
95            Arc::new(GrpcClientProvider::new(context.clone()));
96
97        let db = Database::in_memory().await.unwrap();
98        let trigger_store = SqlTriggerStore::new(db);
99        let controller = Arc::new(SpinnerController::new(
100            context.clone(),
101            Box::new(trigger_store),
102            dispatcher_client_provider,
103        ));
104        controller.start().await.unwrap();
105
106        let handler =
107            SchedulerAPIHandler::new(context.clone(), controller.clone());
108        let svc = SchedulerServer::new(handler);
109
110        let cloned_socket = Arc::clone(&socket);
111
112        let serve_future = tokio::spawn(async move {
113            let request_processing_timeout_s = 3;
114            service::grpc_serve_unix(
115                &mut context,
116                &*cloned_socket,
117                svc,
118                request_processing_timeout_s,
119            )
120            .await;
121        });
122
123        // Give the server time to start.
124        tokio::time::sleep(std::time::Duration::from_millis(250)).await;
125
126        let client_provider = TestGrpcClientProvider::new_single_shard(socket);
127
128        (serve_future, client_provider)
129    }
130}