use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use futures::channel::mpsc::{UnboundedSender, unbounded};
use log::info;
use serde::Deserialize;
use shvclient::client::Full;
use shvproto::RpcValue;
use shvrpc::client::ClientConfig;
use size::Size;
use tokio::signal::unix::{SignalKind, signal};
use tokio::sync::RwLock;
use crate::alarm::Alarm;
use self::sites::SiteOnlineStatus;
use self::util::DedupSender;
mod getlog;
mod alarmlog;
mod pushlog;
mod sites;
mod tree;
mod sync;
mod cleanup;
mod dirtylog;
mod util;
pub mod alarm;
pub use crate::util::init_logger;
const fn max_sync_tasks_default() -> usize { 8 }
const fn max_journal_dir_size_default() -> Size { Size::from_const(30 * size::GiB) }
const fn periodic_sync_interval_default() -> u64 { 60 * 60 }
fn journal_dir_default() -> String {
"/tmp/hp-rs/shvjournal".into()
}
#[derive(Clone, Deserialize)]
pub struct HpConfig {
#[serde(default = "journal_dir_default")]
pub journal_dir: String,
#[serde(default = "max_sync_tasks_default")]
pub max_sync_tasks: usize,
#[serde(default = "max_journal_dir_size_default")]
pub max_journal_dir_size: Size,
#[serde(default = "periodic_sync_interval_default")]
pub periodic_sync_interval: u64,
#[serde(default)]
pub days_to_keep: i64,
}
impl Default for HpConfig {
fn default() -> Self {
Self {
journal_dir: journal_dir_default(),
max_sync_tasks: max_sync_tasks_default(),
max_journal_dir_size: max_journal_dir_size_default(),
periodic_sync_interval: periodic_sync_interval_default(),
days_to_keep: Default::default(),
}
}
}
impl HpConfig {
pub fn load(config_file: impl AsRef<std::path::Path>) -> Result<Self, String> {
let config = std::fs::read_to_string(config_file)
.map_err(|e| format!("Config file read error: {e}"))?;
serde_yaml_ng::from_str(&config)
.map_err(|e| format!("Config file format error: {e}"))
}
}
#[derive(Clone)]
struct AlarmWithTimestamp {
alarm: Alarm,
timestamp: shvproto::DateTime,
stale: bool,
}
impl From<AlarmWithTimestamp> for RpcValue {
fn from(value: AlarmWithTimestamp) -> Self {
let mut alarm_map = value.alarm.into_rpc_map(true);
alarm_map.insert("timestamp".to_string(), value.timestamp.into());
alarm_map.insert("stale".to_string(), value.stale.into());
alarm_map.into()
}
}
pub struct State {
start_time: std::time::Instant,
sites_data: RwLock<sites::SitesData>,
sync_info: sync::SyncInfo,
alarms: RwLock<BTreeMap<String, Vec<AlarmWithTimestamp>>>,
state_alarms: RwLock<BTreeMap<String, Vec<AlarmWithTimestamp>>>,
online_states: RwLock<BTreeMap<String, SiteOnlineStatus>>,
config: HpConfig,
sync_cmd_tx: DedupSender<sync::SyncCommand>,
dirtylog_cmd_tx: UnboundedSender<dirtylog::DirtyLogCommand>,
app_closing: AtomicBool,
}
#[derive(Default)]
pub struct AppTasks {
sites_task: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
sync_task: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
dirtylog_task: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
impl AppTasks {
pub async fn wait_until_finished(self, app_state: &State) {
if let Some(task) = self.sites_task {
task.await;
}
app_state.sync_cmd_tx.close_channel();
if let Some(task) = self.sync_task {
task.await;
}
app_state.dirtylog_cmd_tx.close_channel();
if let Some(task) = self.dirtylog_task {
task.await;
}
}
}
#[allow(clippy::type_complexity)]
pub fn make_client(hp_config: &HpConfig, tasks: &mut AppTasks) -> shvrpc::Result<(Arc<State>, shvclient::Client<Full>, impl FnOnce(shvclient::ClientCommandSender, shvclient::ClientEventsReceiver))> {
info!("Max journal dir size: {}", &hp_config.max_journal_dir_size);
info!("Setting up journal dir: {}", &hp_config.journal_dir);
std::fs::create_dir_all(&hp_config.journal_dir)?;
info!("Journal dir path: {}", std::fs::canonicalize(&hp_config.journal_dir).expect("Invalid journal dir").to_string_lossy());
let (sync_cmd_tx, sync_cmd_rx) = crate::util::dedup_channel();
let (dirtylog_cmd_tx, dirtylog_cmd_rx) = unbounded();
let app_state = Arc::new(State {
start_time: std::time::Instant::now(),
sites_data: RwLock::default(),
sync_info: Default::default(),
alarms: Default::default(),
state_alarms: Default::default(),
online_states: Default::default(),
config: hp_config.clone(),
sync_cmd_tx,
dirtylog_cmd_tx,
app_closing: AtomicBool::new(false),
});
let client = shvclient::Client::new()
.mount_dynamic("", {
let app_state = app_state.clone();
move |rq, cmd_sender|
tree::request_handler(rq, cmd_sender, app_state.clone())
});
let init_function = {
let app_state = app_state.clone();
move |client_cmd_tx: shvclient::ClientCommandSender, client_evt_rx: shvclient::ClientEventsReceiver| {
let wrap_task = |handle| Box::pin(async move {
if let Err(task_result) = handle.await {
log::error!("Failed to join task: {task_result}");
}
});
let handle_signal = |signal_kind| {
let app_state = app_state.clone();
let client_cmd_tx = client_cmd_tx.clone();
match signal(signal_kind) {
Ok(mut stream) => {
tokio::spawn(async move {
stream.recv().await;
app_state.app_closing.store(true, std::sync::atomic::Ordering::Relaxed);
client_cmd_tx.terminate_client();
});
},
Err(err) => log::error!("Failed to install signal handler: {err}"),
}
};
handle_signal(SignalKind::interrupt());
handle_signal(SignalKind::terminate());
tasks.sites_task = Some(wrap_task(tokio::spawn(sites::sites_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone()))));
tasks.sync_task = Some(wrap_task(tokio::spawn(sync::sync_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone(), sync_cmd_rx))));
tasks.dirtylog_task = Some(wrap_task(tokio::spawn(dirtylog::dirtylog_task(client_cmd_tx, client_evt_rx, app_state.clone(), dirtylog_cmd_rx))));
}
};
Ok((app_state, client, init_function))
}
pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::Result<()> {
let mut tasks = AppTasks::default();
let (app_state, client, init_function) = make_client(hp_config, &mut tasks)?;
client
.run_with_init(client_config, init_function)
.await?;
tasks.wait_until_finished(app_state.as_ref()).await;
Ok(())
}