1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)
use crate::{config::StateServerConfig, grpc_server::StateServer};

use eth_state_fold::Foldable;
use eth_state_fold_types::ethers::providers::Middleware;
use eth_state_server_common::state_fold_server::state_fold_server::StateFoldServer;

use std::sync::Arc;
use tokio::{select, signal, sync::oneshot};
use tonic::transport::Server;

pub async fn start_server<
    M: Middleware + 'static,
    UD: Send + Sync + 'static,
    F: Foldable<UserData = UD> + 'static,
>(
    config: &StateServerConfig,
    state_server: StateServer<M, UD, F>,
    kill_switch: oneshot::Receiver<()>,
) -> Result<(), tonic::transport::Error>
where
    F::InitialState: serde::de::DeserializeOwned + 'static,
    F: serde::Serialize,
{
    let (mut health_reporter, health_server) = tonic_health::server::health_reporter();

    health_reporter
        .set_serving::<StateFoldServer<StateServer<M, UD, F>>>()
        .await;

    let block_subscriber = Arc::clone(&state_server.block_subscriber);

    tracing::info!("StateFoldServer listening on {}", config.server_address);

    Server::builder()
        .trace_fn(|_| tracing::trace_span!("state_fold_server"))
        .add_service(health_server)
        .add_service(
            StateFoldServer::new(state_server)
                .max_decoding_message_size(config.max_decoding_message_size),
        )
        .serve_with_shutdown(config.server_address, async {
            select! {
                r = block_subscriber.wait_for_completion() => {
                    tracing::error!("`block_subscriber` has exited: {:?}", r);
                    tracing::error!("Shutting down...");
                }

                r = kill_switch => {
                    tracing::info!("Graceful context shutdown: {:?}", r);
                }
            }
        })
        .await
}

pub async fn wait_for_signal(tx: oneshot::Sender<()>) {
    let _ = signal::ctrl_c().await;
    tracing::info!("SIGINT received: shutting down");
    let _ = tx.send(());
}