cronback_dispatcher_srv/
lib.rs

1mod actions;
2mod dispatch_manager;
3mod handler;
4mod retry;
5
6use std::sync::Arc;
7
8use lib::database::attempt_log_store::{AttemptLogStore, SqlAttemptLogStore};
9use lib::database::run_store::{RunStore, SqlRunStore};
10use lib::database::Database;
11use lib::{netutils, service};
12use proto::dispatcher_proto::dispatcher_server::DispatcherServer;
13use tracing::info;
14
15use crate::dispatch_manager::DispatchManager;
16
17#[tracing::instrument(skip_all, fields(service = context.service_name()))]
18pub async fn start_dispatcher_server(
19    mut context: service::ServiceContext,
20) -> anyhow::Result<()> {
21    let config = context.load_config();
22    let addr = netutils::parse_addr(
23        &config.dispatcher.address,
24        config.dispatcher.port,
25    )
26    .unwrap();
27
28    let db = Database::connect(&config.dispatcher.database_uri).await?;
29    db.migrate().await?;
30    let attempt_store: Arc<dyn AttemptLogStore + Send + Sync> =
31        Arc::new(SqlAttemptLogStore::new(db.clone()));
32
33    let run_store: Arc<dyn RunStore + Send + Sync> =
34        Arc::new(SqlRunStore::new(db));
35
36    let dispatch_manager = DispatchManager::new(
37        config.dispatcher.cell_id,
38        run_store.clone(),
39        attempt_store.clone(),
40    );
41    dispatch_manager.start().await?;
42
43    let handler = handler::DispatcherAPIHandler::new(
44        context.clone(),
45        dispatch_manager,
46        run_store,
47    );
48    let svc = DispatcherServer::new(handler);
49
50    // grpc server
51    info!("Starting Dispatcher on {:?}", addr);
52
53    // The stack of middleware that our service will be wrapped in
54    service::grpc_serve_tcp(
55        &mut context,
56        addr,
57        svc,
58        config.dispatcher.request_processing_timeout_s,
59    )
60    .await;
61
62    Ok(())
63}