use std::{
collections::{HashMap, HashSet},
fs,
path::Path,
sync::Arc,
};
use anyhow::Context;
use migration::MigratorTrait;
use sea_orm::{
ColumnTrait, Database, DatabaseConnection, EntityTrait as _, QueryFilter as _,
};
use signer_core::{SignerFsStore, SignerUser};
use tokio::sync::{Mutex, broadcast};
use crate::{entity::server, signer_connection::SignerConnection};
use super::signer_daemon_event::SignerDaemonEvent;
#[derive(Debug, Clone)]
pub struct SignerDaemon {
pub user: Arc<Mutex<SignerUser>>,
pub db: DatabaseConnection,
pub peer: String,
pub connections: Arc<Mutex<HashMap<String, Arc<SignerConnection>>>>,
pub(crate) user_file_path: Option<String>,
pub(crate) event_sender: broadcast::Sender<SignerDaemonEvent>,
pub(crate) event_receiver: broadcast::Sender<SignerDaemonEvent>,
}
impl SignerDaemon {
pub async fn from_memory(
user: &SignerUser,
peer: &str,
) -> anyhow::Result<Self> {
let db = Database::connect("sqlite::memory:").await?;
migration::Migrator::up(&db, None)
.await
.context("running migration failed")?;
let (event_sender, _) = broadcast::channel(64);
let (event_receiver, _) = broadcast::channel(64);
let daemon = Self {
db,
peer: peer.to_string(),
user: Arc::new(Mutex::new(user.clone())),
connections: Arc::new(Mutex::new(HashMap::new())),
user_file_path: None,
event_sender,
event_receiver,
};
daemon.start_watch_event();
daemon.emit(SignerDaemonEvent::ApplyEvent)?;
daemon.emit(SignerDaemonEvent::UpdateConnections)?;
Ok(daemon)
}
pub async fn from_path(
user: &SignerUser,
path: &str,
user_file_name: &str,
) -> anyhow::Result<Self> {
let db_url = format!("sqlite://{}/signer.sqlite3?mode=rwc", path);
let db = Database::connect(&db_url).await?;
migration::Migrator::up(&db, None)
.await
.context("running migration failed")?;
let peer = match fs::read_to_string(Path::new(path).join("peer")) {
Ok(val) => val,
Err(_) => {
let new_peer = uuid::Uuid::new_v4().to_string();
fs::write(Path::new(path).join("peer"), &new_peer)?;
new_peer
}
};
let (event_sender, _) = broadcast::channel(64);
let (event_receiver, _) = broadcast::channel(64);
let daemon = Self {
db,
peer,
user: Arc::new(Mutex::new(user.clone())),
connections: Arc::new(Mutex::new(HashMap::new())),
user_file_path: Some(
Path::new(path)
.join(user_file_name)
.to_str()
.unwrap()
.to_string(),
),
event_sender,
event_receiver,
};
daemon.start_watch_event();
daemon.emit(SignerDaemonEvent::ApplyEvent)?;
daemon.emit(SignerDaemonEvent::UpdateConnections)?;
Ok(daemon)
}
pub async fn from_sfs(
user: &SignerUser,
sfs: &SignerFsStore,
) -> anyhow::Result<Self> {
SignerDaemon::from_path(
user,
sfs.root.join(&user.public.pub_key).to_str().unwrap(),
"user.json",
)
.await
}
pub async fn get_connection(
&self,
addr: &str,
) -> Option<Arc<SignerConnection>> {
self.connections.lock().await.get(addr).map(|i| i.clone())
}
pub async fn update_connections(&self) -> anyhow::Result<()> {
let mut remove_set = self
.connections
.lock()
.await
.keys()
.map(|i| i.to_string())
.collect::<HashSet<String>>();
let servers = server::Entity::find()
.filter(server::Column::Enable.eq(true))
.all(&self.db)
.await?;
for server in servers {
remove_set.remove(&server.addr.to_string());
if self
.connections
.lock()
.await
.contains_key(&server.addr.to_string())
{
continue;
}
self.connections.lock().await.insert(
server.addr.to_string(),
SignerConnection::new(&self, &server).await,
);
}
for remove in remove_set {
if let Some(rm) = self.connections.lock().await.remove(&remove) {
rm.close();
}
}
Ok(())
}
pub async fn finalize(&self) -> anyhow::Result<()> {
tracing::info!(
"closing database {}",
self.user.lock().await.public.pub_key
);
self.connections.lock().await.values().for_each(|i| {
i.close();
});
self.db.close_by_ref().await?;
Ok(())
}
}