assistant_daemon 0.1.0

Daemon program for providing many features.
use crate::generated::{Config, Feature, FeatureSettings, Features, SshpmSetting, WpSetting};
use anyhow::{Context, Result};
use async_trait::async_trait;
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use serde_json;
use std::fs::File;
use std::future::Future;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, info};

/// Directory for holding configuration and feature's data
pub fn get_config_dir() -> PathBuf {
    dirs::home_dir().unwrap().join(".devhelp")
}

/// Create config directory if it does not exist
pub fn init_dir(dir: &Path) -> Result<()> {
    if !dir.exists() {
        std::fs::create_dir_all(dir)?;
    }
    Ok(())
}

pub fn get_feature_dir(feature: &str) -> PathBuf {
    get_config_dir().join(feature)
}

type ConfigChangeCallback<C> =
    Arc<dyn Fn(Option<C>, C) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;

#[async_trait]
pub trait ConfigManager<C: DeserializeOwned + Serialize + Default + Send + Sync + Clone>:
    Send + Sync
{
    async fn get(&self) -> Result<Option<C>>;
    async fn update(&self, new_config: C) -> Result<()>;
    fn on(&self, cb: ConfigChangeCallback<C>);
}

// Config manager implemented by the simple file system
pub struct FsConfigManager<C: DeserializeOwned + Serialize + Default + Send + Clone + PartialEq> {
    cache: RwLock<Option<C>>,
    config_dir: PathBuf,
    callbacks: std::sync::RwLock<Vec<ConfigChangeCallback<C>>>,
    config_file_name: String,
    fs_watcher: RwLock<Option<RecommendedWatcher>>,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            settings: FeatureSettings {
                wp: Default::default(),
                sshpm: Default::default(),
            },
            features: Default::default(),
            detect_lidchange: None,
            run_at_start_up: None,
        }
    }
}

impl Default for SshpmSetting {
    fn default() -> Self {
        Self {
            lpmappings: Default::default(),
        }
    }
}

impl Default for Features {
    fn default() -> Self {
        Self {
            wp: Feature { status: true },
        }
    }
}

impl<
        C: DeserializeOwned
            + Serialize
            + Default
            + Send
            + Sync
            + Clone
            + PartialEq
            + std::fmt::Debug
            + 'static,
    > FsConfigManager<C>
{
    pub fn new(config_dir: PathBuf, config_file_name: String) -> Self {
        Self {
            cache: Default::default(),
            config_dir,
            config_file_name,
            callbacks: Default::default(),
            fs_watcher: Default::default(),
        }
    }

    pub async fn init(&self, default_config: Option<C>) -> Result<()> {
        let config_path = self.config_dir.join(&self.config_file_name);
        let config = if config_path.exists() {
            debug!("found existing config file: {:?}", config_path);

            let mut file = File::open(&config_path).context("failed to open config file")?;
            let mut contents = String::new();
            file.read_to_string(&mut contents)
                .context("failed to read config file")?;
            serde_json::from_str(&contents).context("failed to parse config file")?
        } else {
            match default_config {
                Some(config) => config,
                None => C::default(),
            }
        };
        self.update(config).await?;
        Ok(())
    }

    pub async fn init_fs_watch(self: &Arc<Self>) -> Result<()> {
        let config_file = self.config_dir.join(&self.config_file_name);

        let config_file_copy = config_file.clone();
        let self_clone = self.clone();
        let runtime = tokio::runtime::Handle::current();
        let watcher =
            notify::recommended_watcher(move |res: Result<notify::Event, notify::Error>| {
                let config_file = config_file_copy.clone();
                let self_clone = self_clone.clone();
                let runtime = runtime.clone();
                match res {
                    Ok(e) => {
                        if e.kind.is_modify() {
                            runtime.spawn(async move {
                                match self_clone.update_from_file().await {
                                    Ok(_) => {}
                                    Err(err) => {
                                        error!(
                                            "update from file {:?} error: {:?}",
                                            config_file, err
                                        )
                                    }
                                }
                            });
                        }
                    }
                    Err(err) => error!("watch {:?} error: {:?}", config_file, err),
                };
            })?;

        *self.fs_watcher.write().await = Some(watcher);

        Ok(())
    }

    pub async fn turn_on_fs_watch(self: &Arc<Self>) -> Result<()> {
        let config_file: PathBuf = self.config_dir.join(&self.config_file_name);

        let need_init = { self.fs_watcher.read().await.is_none() };

        if need_init {
            self.init_fs_watch().await?;
        }

        self.fs_watcher
            .write()
            .await
            .as_mut()
            .unwrap()
            .watch(&config_file, RecursiveMode::NonRecursive)?;

        Ok(())
    }

    /// Whether it should listen to the file system change to the config file
    pub async fn set_fs_watch(self: &Arc<Self>, flag: bool) -> Result<()> {
        let watcher_exist = self.fs_watcher.read().await.is_some();

        if watcher_exist {
            if !flag {
                *self.fs_watcher.write().await = None;
            } else {
                self.turn_on_fs_watch().await?;
            }
        } else {
            if flag {
                self.turn_on_fs_watch().await?;
            }
        }

        Ok(())
    }

    async fn save_to_file(&self, new_config: &C) -> Result<()> {
        debug!("save new config to file: {:?}", new_config);

        let config_path = self.config_dir.join(&self.config_file_name);
        let contents =
            serde_json::to_string_pretty(new_config).context("failed to serialize config")?;
        let mut file = File::create(&config_path).context("failed to create config file")?;
        file.write_all(contents.as_bytes())
            .context("failed to write config file")?;

        Ok(())
    }

    async fn update_from_file(&self) -> Result<()> {
        let config_path = self.config_dir.join(&self.config_file_name);
        if !config_path.exists() {
            return self.init(None).await;
        }
        let mut buffer = String::new();
        File::open(config_path)?.read_to_string(&mut buffer)?;
        let config: C = serde_json::from_str(&buffer)?;
        self.update(config).await?;
        Ok(())
    }
}

#[async_trait]
impl<
        C: DeserializeOwned
            + Serialize
            + Default
            + Send
            + Sync
            + Clone
            + PartialEq
            + std::fmt::Debug
            + 'static,
    > ConfigManager<C> for FsConfigManager<C>
{
    async fn get(&self) -> Result<Option<C>> {
        let cache = self.cache.read().await;
        Ok(cache.clone())
    }

    async fn update(&self, new_config: C) -> Result<()> {
        debug!("update config: {:?}", new_config);
        let mut cache_lock = self.cache.write().await;

        // skip updating if old equal to new
        match &*cache_lock {
            Some(cache) => {
                if &new_config == cache {
                    return Ok(());
                }
            }
            None => {}
        };

        let old_config = cache_lock.clone();

        self.save_to_file(&new_config).await?;
        *cache_lock = Some(new_config.clone());

        for cb in &*(self.callbacks.read().unwrap()) {
            let new_config = new_config.clone();
            let old_config = old_config.clone();
            let cb = cb.clone();
            tokio::spawn(async move { cb(old_config, new_config).await });
        }

        Ok(())
    }

    fn on(&self, cb: ConfigChangeCallback<C>) {
        self.callbacks.write().unwrap().push(cb);
    }
}