use std::io::Error;
#[allow(dead_code)]
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use libc::{SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR1};
use lunchbox::LocalFS;
use lunchbox::ReadableFileSystem;
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, trace};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::{filter, prelude::*, reload, Registry};
use wora::errors::*;
use wora::events::Event;
use wora::exec::*;
use wora::exec_unix::*;
use wora::metrics::*;
use wora::restart_policy::MainRetryAction;
use wora::*;
#[derive(Clone, Debug, ValueEnum, Serialize, Deserialize)]
pub enum RunMode {
Sys,
User,
}
#[derive(Clone, Debug, Parser, Serialize, Deserialize)]
#[command(name = "async_daemon")]
#[command(author, version, about = "async wora daemon example", long_about = None)]
#[command(propagate_version = true)]
pub struct DaemonArgs {
#[arg(short, long, value_enum, default_value_t=RunMode::User)]
pub run_mode: RunMode,
}
#[derive(Default, Deserialize)]
struct Obj {
t_or_f: bool,
list: Vec<String>,
}
#[derive(Default, Deserialize)]
pub struct DaemonConfig {
str: String,
num: Option<u16>,
obj: Obj,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DaemonState {}
type DaemonSharedState = Arc<RwLock<DaemonState>>;
struct DaemonApp {
args: DaemonArgs,
state: DaemonSharedState,
log_reload_handle: tracing_subscriber::reload::Handle<filter::LevelFilter, Registry>,
config: DaemonConfig,
}
impl Config for DaemonConfig {
type ConfigT = DaemonConfig;
fn parse_main_config_file(data: String) -> Result<DaemonConfig, Box<dyn std::error::Error>> {
match toml::from_str(&data) {
Ok(v) => return Ok(v),
Err(err) => return Err(Box::new(err)),
}
}
fn parse_supplemental_config_file(
_file_path: PathBuf,
data: String,
) -> Result<DaemonConfig, Box<dyn std::error::Error>> {
match toml::from_str(&data) {
Ok(v) => return Ok(v),
Err(err) => return Err(Box::new(err)),
}
}
}
#[async_trait]
impl App<()> for DaemonApp {
type AppMetricsProducer = MetricsProducerStdout;
type AppConfig = DaemonConfig;
fn name(&self) -> &'static str {
"async_daemon"
}
async fn setup(
&mut self,
wora: &Wora<()>,
exec: &(dyn Executor + Send + Sync),
fs: &LocalFS,
_metrics: &(dyn MetricProcessor + Send + Sync),
) -> Result<(), Box<dyn std::error::Error>> {
debug!("{:?}", wora.stats_from_start());
let args = DaemonArgs::parse();
self.args = args;
debug!("{:?}", exec.disable_core_dumps());
Ok(())
}
async fn main(
&mut self,
wora: &mut Wora<()>,
exec: &(dyn Executor + Send + Sync),
_metrics: &mut (dyn MetricProcessor + Send + Sync),
) -> MainRetryAction {
info!("waiting for events...");
while let Some(ev) = wora.receiver.recv().await {
info!("event: {:?}", &ev);
match ev {
Event::UnixSignal(signum) => match signum {
SIGTERM | SIGINT | SIGQUIT => return MainRetryAction::UseExitCode(1),
SIGHUP => {
info!("sighup!");
}
SIGUSR1 => {
self.log_reload_handle.modify(|filter| {
*filter = filter::LevelFilter::TRACE;
});
trace!("changed log level to trace");
}
_ => {}
},
Event::Shutdown(dt) => {
info!("shutting down at {:?}", dt);
return MainRetryAction::Success;
}
Event::SystemResource(_) => {}
Event::ConfigChange(_file, data) => {
match DaemonConfig::parse_main_config_file(data) {
Ok(cfg) => {
info!("config changed");
self.config = cfg;
}
Err(err) => {
error!("failed to parse config{:?}", err);
}
}
}
Event::Suspended(dt) => {
info!("suspending at {:?}", dt);
}
Event::LogRotation => {
info!("rotating log");
}
Event::LeadershipChange(old_state, new_state) => {
info!(
"leadership has changed from state {:?} to {:?}",
old_state, new_state
);
}
Event::App(_) => {}
_ => {}
}
}
MainRetryAction::Success
}
async fn is_healthy() -> HealthState {
HealthState::Ok
}
async fn end(
&mut self,
_wora: &Wora<()>,
_exec: &(dyn Executor + Send + Sync),
_metrics: &(dyn MetricProcessor + Send + Sync),
) {
()
}
}
#[tokio::main]
async fn main() -> Result<(), MainEarlyReturn> {
let filter = filter::LevelFilter::TRACE;
let (filter, reload_handle) = reload::Layer::new(filter);
let format = tracing_subscriber::fmt::format()
.with_file(true)
.with_line_number(true)
.with_level(true) .with_target(true) .with_thread_ids(true) .with_thread_names(true) ; let x = tracing_subscriber::fmt::layer()
.event_format(format)
.with_span_events(FmtSpan::CLOSE | FmtSpan::ENTER);
tracing_subscriber::registry().with(filter).with(x).init();
let args = DaemonArgs::parse();
let app_state = DaemonState {};
let app = DaemonApp {
args: args.clone(),
state: Arc::new(RwLock::new(app_state)),
log_reload_handle: reload_handle,
config: DaemonConfig::default(),
};
let metrics = MetricsProducerStdout::new().await;
let fs = LocalFS::new().unwrap();
match &args.run_mode {
RunMode::Sys => {
let exec = UnixLikeSystem::new(app.name()).await;
exec_async_runner(exec, app, fs, metrics).await?
}
RunMode::User => match UnixLikeUser::new(app.name()).await {
Ok(exec) => exec_async_runner(exec, app, fs, metrics).await?,
Err(exec_err) => {
error!("exec error:{}", exec_err);
return Err(MainEarlyReturn::IO(exec_err));
}
},
}
Ok(())
}