#![warn(missing_docs)]
use chrono::Local;
use std::collections::{HashMap, VecDeque};
use std::error::Error;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::Path;
use std::sync::mpsc::{Receiver, Sender};
use crate::services::{map_comm_pairs, Service};
use std::thread;
pub mod services;
pub struct Message {
to: String,
author: String,
content: String,
}
impl Message {
pub fn new(name: &str, to: &str, content: &str) -> Message {
Message {
to: to.to_string(),
author: name.to_string(),
content: content.to_string(),
}
}
}
pub struct Messenger {
ids: HashMap<String, (Sender<Message>, Receiver<Message>)>,
}
impl Messenger {
pub fn new() -> Self {
Messenger {
ids: HashMap::new(),
}
}
pub fn set_comm_pair(
&mut self,
name: String,
comm_pair: (Sender<Message>, Receiver<Message>),
) -> () {
self.ids.insert(name, comm_pair);
}
pub fn route(&self, msg: Message) -> Result<(), Box<dyn Error>> {
if let Some((tx, _)) = self.ids.get(&msg.to) {
tx.send(msg).unwrap();
Ok(())
} else {
Err(format!("ID not found in pairs: {}", msg.to).into()) }
}
}
pub struct Logger {
name: String,
rx: Option<Receiver<Message>>,
tx: Option<Sender<Message>>,
log_path: String,
}
impl Logger {
pub fn new() -> Logger {
Logger {
name: "logger".to_string(),
log_path: "./logs/log".to_string(),
tx: None,
rx: None,
}
}
pub fn set_path(mut self, path: &str) -> Self {
self.log_path = format!("./logs/{}", path);
self
}
pub fn log(&self, message: &str) -> Result<(), Box<dyn Error>> {
let log_file_path = Path::new(&self.log_path);
if let Some(parent) = log_file_path.parent() {
if !parent.exists() {
fs::create_dir_all(parent)?;
}
}
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(&self.log_path)?;
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
writeln!(file, "{} - {}", timestamp, message)?;
Ok(())
}
}
impl Service for Logger {
fn run(&self) -> Result<(), Box<dyn Error>> {
loop {
if let Some(rx) = &self.rx {
if let Ok(msg) = rx.try_recv() {
println!("Logger received msg: {}", msg.content);
if let Err(e) = self.log(&msg.content) {
eprintln!("Logger errored with error: {}", e);
}
}
}
}
}
fn send(&self, msg: Message) -> Result<(), Box<dyn Error>> {
if let Some(tx) = &self.tx {
tx.send(msg)?
}
Ok(())
}
fn identify(&self) -> String {
return self.name.clone();
}
fn set_comm_pair(&mut self, tx: Sender<Message>, rx: Receiver<Message>) -> () {
self.tx = Some(tx);
self.rx = Some(rx);
}
}
pub struct Dispatcher {
service_ids: HashMap<String, u8>,
services: VecDeque<Box<dyn Service>>,
logger: Logger,
}
impl Dispatcher {
pub fn new() -> Dispatcher {
Dispatcher {
service_ids: HashMap::new(),
services: VecDeque::new(),
logger: Logger::new(),
}
}
pub fn register(&mut self, service: Box<dyn Service>) {
self.services.push_front(service);
}
pub fn start(mut self) {
let mut handles = vec![];
let mut messenger = Messenger::new();
while !self.services.is_empty() {
if let Some(mut service) = self.services.pop_front() {
map_comm_pairs(&mut service, &mut messenger);
handles.push(thread::spawn(move || {
if let Err(e) = service.run() {
println!("Service run failed: {}", e);
}
}));
}
}
'check_message: loop {
for (_, rx) in messenger.ids.values() {
if let Ok(msg) = rx.try_recv() {
match messenger.route(msg) {
Ok(_) => {}
Err(e) => {
eprintln!("Dispatcher messager caught the following error: {e}");
break 'check_message;
}
};
}
}
}
for handle in handles {
handle.join().unwrap();
}
}
pub fn log_table(&self) -> Result<(), Box<dyn Error>> {
let table_state = format!("{:?}", self.service_ids);
self.logger.log(&table_state)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::services::TestService;
use std::{fs, path::Path};
fn clean() -> Result<(), Box<dyn std::error::Error>> {
let log_dir_path = Path::new("logs/tests");
if log_dir_path.exists() {
fs::remove_dir_all(log_dir_path)?;
println!("Successfully removed {}", log_dir_path.display());
} else {
println!("Directory {} does not exist.", log_dir_path.display());
}
Ok(())
}
#[test]
fn test_dispatcher() {
let mut d = Dispatcher::new();
d.register(Box::new(Logger::new().set_path("tests/test_dispatcher")));
d.register(Box::new(TestService::new("test")));
d.start();
clean().expect("FAILED TO CLEAN");
}
#[tokio::test]
async fn test_dispatch_db() {
let mut d = Dispatcher::new();
let db_type = services::database::DatabaseType::LOCAL;
let db = match services::Database::new("test", "test", db_type).await {
Ok(db) => db,
Err(e) => panic!("test_dispatch_db: {}", e),
};
let tt = Box::new(TestService::new("test_service"));
assert_eq!(db.identify(), db.name);
assert_eq!(tt.identify(), "test_service");
d.register(Box::new(db));
d.register(tt);
d.start();
}
#[test]
fn test_log() {
let logger = Logger::new();
assert!(
logger.log("This is a log from the test_log test").is_ok(),
"Logging failed"
);
clean().expect("FAILED TO CLEAN");
}
#[test]
fn test_custom_log() {
let logger = Logger::new().set_path("/tests/custom_logs");
assert!(
logger.log("This is a log from the test_log test").is_ok(),
"Logging failed"
);
clean().expect("FAILED TO CLEAN");
}
}