use crate::CliConfiguration;
use crate::Result;
use crate::TetcoreCli;
use chrono::prelude::*;
use futures::pin_mut;
use futures::select;
use futures::{future, future::FutureExt, Future};
use log::info;
use tc_service::{Configuration, TaskType, TaskManager};
use tc_telemetry::{TelemetryHandle, TelemetryWorker};
use tetcore_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use std::marker::PhantomData;
use tc_service::Error as ServiceError;
use crate::error::Error as CliError;
#[cfg(target_family = "unix")]
async fn main<F, E>(func: F) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>> + future::FusedFuture,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
use tokio::signal::unix::{signal, SignalKind};
let mut stream_int = signal(SignalKind::interrupt()).map_err(ServiceError::Io)?;
let mut stream_term = signal(SignalKind::terminate()).map_err(ServiceError::Io)?;
let t1 = stream_int.recv().fuse();
let t2 = stream_term.recv().fuse();
let t3 = func;
pin_mut!(t1, t2, t3);
select! {
_ = t1 => {},
_ = t2 => {},
res = t3 => res?,
}
Ok(())
}
#[cfg(not(unix))]
async fn main<F, E>(func: F) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>> + future::FusedFuture,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
use tokio::signal::ctrl_c;
let t1 = ctrl_c().fuse();
let t2 = func;
pin_mut!(t1, t2);
select! {
_ = t1 => {},
res = t2 => res?,
}
Ok(())
}
pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new()
.threaded_scheduler()
.on_thread_start(|| {
TOKIO_THREADS_ALIVE.inc();
TOKIO_THREADS_TOTAL.inc();
})
.on_thread_stop(|| {
TOKIO_THREADS_ALIVE.dec();
})
.enable_all()
.build()
}
fn run_until_exit<F, E>(
mut tokio_runtime: tokio::runtime::Runtime,
future: F,
task_manager: TaskManager,
) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>> + future::Future,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
let f = future.fuse();
pin_mut!(f);
tokio_runtime.block_on(main(f))?;
tokio_runtime.block_on(task_manager.clean_shutdown());
Ok(())
}
pub struct Runner<C: TetcoreCli> {
config: Configuration,
tokio_runtime: tokio::runtime::Runtime,
telemetry_worker: TelemetryWorker,
phantom: PhantomData<C>,
}
impl<C: TetcoreCli> Runner<C> {
pub fn new<T: CliConfiguration>(
cli: &C,
command: &T,
telemetry_worker: TelemetryWorker,
) -> Result<Runner<C>> {
let tokio_runtime = build_runtime()?;
let runtime_handle = tokio_runtime.handle().clone();
let task_executor = move |fut, task_type| {
match task_type {
TaskType::Async => runtime_handle.spawn(fut).map(drop),
TaskType::Blocking =>
runtime_handle.spawn_blocking(move || futures::executor::block_on(fut))
.map(drop),
}
};
let telemetry_handle = telemetry_worker.handle();
Ok(Runner {
config: command.create_configuration(
cli,
task_executor.into(),
Some(telemetry_handle),
)?,
tokio_runtime,
telemetry_worker,
phantom: PhantomData,
})
}
fn print_node_infos(&self) {
info!("{}", C::impl_name());
info!("✌️ version {}", C::impl_version());
info!(
"❤️ by {}, {}-{}",
C::author(),
C::copyright_start_year(),
Local::today().year(),
);
info!("📋 Chain specification: {}", self.config.chain_spec.name());
info!("🏷 Node name: {}", self.config.network.node_name);
info!("👤 Role: {}", self.config.display_role());
info!("💾 Database: {} at {}",
self.config.database,
self.config.database.path().map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
);
info!("⛓ Native runtime: {}", C::native_runtime_version(&self.config.chain_spec));
}
pub fn run_node_until_exit<F, E>(
mut self,
initialize: impl FnOnce(Configuration) -> F,
) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<TaskManager, E>>,
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
self.print_node_infos();
let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
task_manager.spawn_handle().spawn("telemetry_worker", self.telemetry_worker.run());
let res = self.tokio_runtime.block_on(main(task_manager.future().fuse()));
self.tokio_runtime.block_on(task_manager.clean_shutdown());
Ok(res?)
}
pub fn sync_run<E>(
self,
runner: impl FnOnce(Configuration) -> std::result::Result<(), E>
) -> std::result::Result<(), E>
where
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
{
runner(self.config)
}
pub fn async_run<F, E>(
self, runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
) -> std::result::Result<(), E>
where
F: Future<Output = std::result::Result<(), E>>,
E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
{
let (future, task_manager) = runner(self.config)?;
run_until_exit::<_, E>(self.tokio_runtime, future, task_manager)
}
pub fn config(&self) -> &Configuration {
&self.config
}
pub fn config_mut(&mut self) -> &mut Configuration {
&mut self.config
}
pub fn telemetry_handle(&self) -> TelemetryHandle {
self.telemetry_worker.handle()
}
}