signer-daemon 0.2.3

Signer daemon package.
Documentation
use std::{
  collections::{HashMap, HashSet},
  fs,
  path::Path,
  sync::Arc,
};

use anyhow::Context;
use migration::MigratorTrait;
use sea_orm::{
  ColumnTrait, Database, DatabaseConnection, EntityTrait as _, QueryFilter as _,
};

use signer_core::{SignerFsStore, SignerUser};
use tokio::sync::{Mutex, broadcast};

use crate::{entity::server, signer_connection::SignerConnection};

use super::signer_daemon_event::SignerDaemonEvent;

#[derive(Debug, Clone)]
pub struct SignerDaemon {
  pub user: Arc<Mutex<SignerUser>>,
  pub db: DatabaseConnection,
  pub peer: String,
  pub connections: Arc<Mutex<HashMap<String, Arc<SignerConnection>>>>,
  pub(crate) user_file_path: Option<String>,
  pub(crate) event_sender: broadcast::Sender<SignerDaemonEvent>,
  pub(crate) event_receiver: broadcast::Sender<SignerDaemonEvent>,
}

impl SignerDaemon {
  pub async fn from_memory(
    user: &SignerUser,
    peer: &str,
  ) -> anyhow::Result<Self> {
    let db = Database::connect("sqlite::memory:").await?;

    migration::Migrator::up(&db, None)
      .await
      .context("running migration failed")?;

    let (event_sender, _) = broadcast::channel(64);
    let (event_receiver, _) = broadcast::channel(64);
    let daemon = Self {
      db,
      peer: peer.to_string(),
      user: Arc::new(Mutex::new(user.clone())),
      connections: Arc::new(Mutex::new(HashMap::new())),
      user_file_path: None,
      event_sender,
      event_receiver,
    };

    daemon.start_watch_event();
    daemon.emit(SignerDaemonEvent::ApplyEvent)?;
    daemon.emit(SignerDaemonEvent::UpdateConnections)?;

    Ok(daemon)
  }

  pub async fn from_path(
    user: &SignerUser,
    path: &str,
    user_file_name: &str,
  ) -> anyhow::Result<Self> {
    let db_url = format!("sqlite://{}/signer.sqlite3?mode=rwc", path);
    let db = Database::connect(&db_url).await?;

    migration::Migrator::up(&db, None)
      .await
      .context("running migration failed")?;

    let peer = match fs::read_to_string(Path::new(path).join("peer")) {
      Ok(val) => val,
      Err(_) => {
        let new_peer = uuid::Uuid::new_v4().to_string();
        fs::write(Path::new(path).join("peer"), &new_peer)?;

        new_peer
      }
    };

    let (event_sender, _) = broadcast::channel(64);
    let (event_receiver, _) = broadcast::channel(64);
    let daemon = Self {
      db,
      peer,
      user: Arc::new(Mutex::new(user.clone())),
      connections: Arc::new(Mutex::new(HashMap::new())),
      user_file_path: Some(
        Path::new(path)
          .join(user_file_name)
          .to_str()
          .unwrap()
          .to_string(),
      ),
      event_sender,
      event_receiver,
    };

    daemon.start_watch_event();
    daemon.emit(SignerDaemonEvent::ApplyEvent)?;
    daemon.emit(SignerDaemonEvent::UpdateConnections)?;

    Ok(daemon)
  }

  pub async fn from_sfs(
    user: &SignerUser,
    sfs: &SignerFsStore,
  ) -> anyhow::Result<Self> {
    SignerDaemon::from_path(
      user,
      sfs.root.join(&user.public.pub_key).to_str().unwrap(),
      "user.json",
    )
    .await
  }

  pub async fn get_connection(
    &self,
    addr: &str,
  ) -> Option<Arc<SignerConnection>> {
    self.connections.lock().await.get(addr).map(|i| i.clone())
  }

  pub async fn update_connections(&self) -> anyhow::Result<()> {
    let mut remove_set = self
      .connections
      .lock()
      .await
      .keys()
      .map(|i| i.to_string())
      .collect::<HashSet<String>>();

    let servers = server::Entity::find()
      .filter(server::Column::Enable.eq(true))
      .all(&self.db)
      .await?;
    for server in servers {
      if remove_set.remove(&server.addr.to_string()) {
        continue;
      }

      self.connections.lock().await.insert(
        server.addr.to_string(),
        SignerConnection::new(&self, &server).await,
      );
    }

    for remove in remove_set {
      if let Some(rm) = self.connections.lock().await.remove(&remove) {
        rm.close().await;
      }
    }

    Ok(())
  }

  pub async fn finalize(&self) -> anyhow::Result<()> {
    tracing::info!(
      "closing database {}",
      self.user.lock().await.public.pub_key
    );

    let conns = self
      .connections
      .lock()
      .await
      .values()
      .map(|i| i.clone())
      .collect::<Vec<_>>();

    for conn in conns {
      conn.close().await;
    }

    self.db.close_by_ref().await?;

    Ok(())
  }
}