historyprovider 2.10.1

historyprovider-rs
Documentation
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;

use futures::channel::mpsc::{UnboundedSender, unbounded};
use log::info;
use serde::Deserialize;
use shvclient::client::Full;
use shvproto::RpcValue;
use shvrpc::client::ClientConfig;
use size::Size;
use tokio::signal::unix::{SignalKind, signal};
use tokio::sync::RwLock;

use crate::alarm::Alarm;
use self::sites::SiteOnlineStatus;
use self::util::DedupSender;

mod getlog;
mod alarmlog;
mod pushlog;
mod sites;
mod tree;
mod sync;
mod cleanup;
mod dirtylog;
mod util;
pub mod alarm;

pub use crate::util::init_logger;

const fn max_sync_tasks_default() -> usize { 8 }
const fn max_journal_dir_size_default() -> Size { Size::from_const(30 * size::GiB)  }
const fn periodic_sync_interval_default() -> u64 { 60 * 60 } // 1 hour

fn journal_dir_default() -> String {
    "/tmp/hp-rs/shvjournal".into()
}

#[derive(Clone, Deserialize)]
pub struct HpConfig {
    #[serde(default = "journal_dir_default")]
    pub journal_dir: String,
    #[serde(default = "max_sync_tasks_default")]
    pub max_sync_tasks: usize,
    #[serde(default = "max_journal_dir_size_default")]
    pub max_journal_dir_size: Size,
    #[serde(default = "periodic_sync_interval_default")]
    pub periodic_sync_interval: u64,
    #[serde(default)]
    pub days_to_keep: i64,
}

impl Default for HpConfig {
    fn default() -> Self {
        Self {
            journal_dir: journal_dir_default(),
            max_sync_tasks: max_sync_tasks_default(),
            max_journal_dir_size: max_journal_dir_size_default(),
            periodic_sync_interval: periodic_sync_interval_default(),
            days_to_keep: Default::default(),
        }
    }
}

impl HpConfig {
    pub fn load(config_file: impl AsRef<std::path::Path>) -> Result<Self, String> {
        let config = std::fs::read_to_string(config_file)
            .map_err(|e| format!("Config file read error: {e}"))?;
        serde_yaml_ng::from_str(&config)
            .map_err(|e| format!("Config file format error: {e}"))
    }
}

#[derive(Clone)]
struct AlarmWithTimestamp {
    alarm: Alarm,
    timestamp: shvproto::DateTime,
    stale: bool,
}

impl From<AlarmWithTimestamp> for RpcValue {
    fn from(value: AlarmWithTimestamp) -> Self {
        let mut alarm_map = value.alarm.into_rpc_map(true);
        alarm_map.insert("timestamp".to_string(), value.timestamp.into());
        alarm_map.insert("stale".to_string(), value.stale.into());
        alarm_map.into()
    }
}

pub struct State {
    start_time: std::time::Instant,
    sites_data: RwLock<sites::SitesData>,
    sync_info: sync::SyncInfo,
    alarms: RwLock<BTreeMap<String, Vec<AlarmWithTimestamp>>>,
    state_alarms: RwLock<BTreeMap<String, Vec<AlarmWithTimestamp>>>,
    online_states: RwLock<BTreeMap<String, SiteOnlineStatus>>,
    config: HpConfig,
    sync_cmd_tx: DedupSender<sync::SyncCommand>,
    dirtylog_cmd_tx: UnboundedSender<dirtylog::DirtyLogCommand>,
    app_closing: AtomicBool,
}

#[derive(Default)]
pub struct AppTasks {
    sites_task: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
    sync_task: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
    dirtylog_task: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

impl AppTasks {
    pub async fn wait_until_finished(self, app_state: &State) {
        if let Some(task) = self.sites_task {
            task.await;
        }
        app_state.sync_cmd_tx.close_channel();
        if let Some(task) = self.sync_task {
            task.await;
        }
        app_state.dirtylog_cmd_tx.close_channel();
        if let Some(task) = self.dirtylog_task {
            task.await;
        }

    }
}

#[allow(clippy::type_complexity)]
pub fn make_client(hp_config: &HpConfig, tasks: &mut AppTasks) -> shvrpc::Result<(Arc<State>, shvclient::Client<Full>, impl FnOnce(shvclient::ClientCommandSender, shvclient::ClientEventsReceiver))> {
    info!("Max journal dir size: {}", &hp_config.max_journal_dir_size);
    info!("Setting up journal dir: {}", &hp_config.journal_dir);
    std::fs::create_dir_all(&hp_config.journal_dir)?;
    info!("Journal dir path: {}", std::fs::canonicalize(&hp_config.journal_dir).expect("Invalid journal dir").to_string_lossy());

    let (sync_cmd_tx, sync_cmd_rx) = crate::util::dedup_channel();
    let (dirtylog_cmd_tx, dirtylog_cmd_rx) = unbounded();

    let app_state = Arc::new(State {
        start_time: std::time::Instant::now(),
        sites_data: RwLock::default(),
        sync_info: Default::default(),
        alarms: Default::default(),
        state_alarms: Default::default(),
        online_states: Default::default(),
        config: hp_config.clone(),
        sync_cmd_tx,
        dirtylog_cmd_tx,
        app_closing: AtomicBool::new(false),
    });

    let client = shvclient::Client::new()
        .mount_dynamic("", {
            let app_state = app_state.clone();
            move |rq, cmd_sender|
                tree::request_handler(rq, cmd_sender, app_state.clone())
        });

    let init_function = {
        let app_state = app_state.clone();
        move |client_cmd_tx: shvclient::ClientCommandSender, client_evt_rx: shvclient::ClientEventsReceiver| {
            let wrap_task = |handle| Box::pin(async move {
                if let Err(task_result) = handle.await {
                    log::error!("Failed to join task: {task_result}");
                }
            });
            let handle_signal = |signal_kind| {
                let app_state = app_state.clone();
                let client_cmd_tx = client_cmd_tx.clone();
                match signal(signal_kind) {
                    Ok(mut stream) => {
                        tokio::spawn(async move {
                            stream.recv().await;
                            // Relaxed is enough - we are not protecting any data by this atomic variable.
                            app_state.app_closing.store(true, std::sync::atomic::Ordering::Relaxed);
                            client_cmd_tx.terminate_client();
                        });
                    },
                    Err(err) => log::error!("Failed to install signal handler: {err}"),
                }
            };

            handle_signal(SignalKind::interrupt());
            handle_signal(SignalKind::terminate());

            tasks.sites_task = Some(wrap_task(tokio::spawn(sites::sites_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone()))));
            tasks.sync_task = Some(wrap_task(tokio::spawn(sync::sync_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone(), sync_cmd_rx))));
            tasks.dirtylog_task = Some(wrap_task(tokio::spawn(dirtylog::dirtylog_task(client_cmd_tx, client_evt_rx, app_state.clone(), dirtylog_cmd_rx))));
        }
    };

    Ok((app_state, client, init_function))
}

pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::Result<()> {
    let mut tasks = AppTasks::default();
    let (app_state, client, init_function) = make_client(hp_config,  &mut tasks)?;
    client
        .run_with_init(client_config, init_function)
        .await?;
    tasks.wait_until_finished(app_state.as_ref()).await;
    Ok(())
}