cronback_dispatcher_srv/
lib.rs1mod actions;
2mod dispatch_manager;
3mod handler;
4mod retry;
5
6use std::sync::Arc;
7
8use lib::database::attempt_log_store::{AttemptLogStore, SqlAttemptLogStore};
9use lib::database::run_store::{RunStore, SqlRunStore};
10use lib::database::Database;
11use lib::{netutils, service};
12use proto::dispatcher_proto::dispatcher_server::DispatcherServer;
13use tracing::info;
14
15use crate::dispatch_manager::DispatchManager;
16
17#[tracing::instrument(skip_all, fields(service = context.service_name()))]
18pub async fn start_dispatcher_server(
19 mut context: service::ServiceContext,
20) -> anyhow::Result<()> {
21 let config = context.load_config();
22 let addr = netutils::parse_addr(
23 &config.dispatcher.address,
24 config.dispatcher.port,
25 )
26 .unwrap();
27
28 let db = Database::connect(&config.dispatcher.database_uri).await?;
29 db.migrate().await?;
30 let attempt_store: Arc<dyn AttemptLogStore + Send + Sync> =
31 Arc::new(SqlAttemptLogStore::new(db.clone()));
32
33 let run_store: Arc<dyn RunStore + Send + Sync> =
34 Arc::new(SqlRunStore::new(db));
35
36 let dispatch_manager = DispatchManager::new(
37 config.dispatcher.cell_id,
38 run_store.clone(),
39 attempt_store.clone(),
40 );
41 dispatch_manager.start().await?;
42
43 let handler = handler::DispatcherAPIHandler::new(
44 context.clone(),
45 dispatch_manager,
46 run_store,
47 );
48 let svc = DispatcherServer::new(handler);
49
50 info!("Starting Dispatcher on {:?}", addr);
52
53 service::grpc_serve_tcp(
55 &mut context,
56 addr,
57 svc,
58 config.dispatcher.request_processing_timeout_s,
59 )
60 .await;
61
62 Ok(())
63}