use std::{collections::HashMap, sync::Arc};
use crate::context::Context;
use crate::service::Service;
use colored::*;
use futures::{SinkExt, StreamExt};
use serde::de::DeserializeOwned;
use uuid::Uuid;
use warp::{ws::Ws, Filter};
pub struct App {
services: HashMap<String, Arc<Box<dyn Fn(&str) -> () + Send + Sync>>>,
port: u16,
node_id: Uuid,
}
impl App {
pub fn new() -> Self {
Self {
services: HashMap::new(),
port: 3030,
node_id: Uuid::new_v4(),
}
}
pub fn add_service<T: DeserializeOwned>(mut self) -> Self
where
T: Service,
{
let f = |message_string: &str| {
let data: std::result::Result<T::Message, serde_json::Error> =
serde_json::from_str(message_string);
T::on_message(data.unwrap(), &Context);
};
let path = T::info().get_path();
self.services.insert(path, Arc::new(Box::new(f)));
self
}
pub async fn run(self) {
self.print_startup();
let services = Arc::new(self.services);
let echo = warp::path!("service" / String).and(warp::ws()).map(
move |service_name: String, ws: Ws| {
let f = services.get(&service_name).unwrap().clone();
println!(" {} New connection on {}", "=-=".green(), service_name);
ws.on_upgrade(move |websocket| async move {
let (mut tx, mut rx) = websocket.split();
while let Some(result) = rx.next().await {
match result {
Ok(msg) => match msg.to_str() {
Ok(msg) => {
f(&msg);
tx.send(warp::ws::Message::text("Received".clone()))
.await
.ok();
}
Err(_) => {}
},
Err(e) => {
println!(" {} {} {}", "=/=".yellow(), "Error:".bold().red(), e);
break;
}
};
}
})
},
);
warp::serve(echo).run(([127, 0, 0, 1], self.port)).await;
}
fn print_startup(&self) {
println!("");
println!("");
println!(" 🔭 {}", "Kaffix".bold().purple());
println!(" {} Address: localhost", "⍿".green());
println!(" {} Port: {}", "⍿".green(), self.port);
println!(" {} Node ID: {}", "⍿".green(), self.node_id);
println!("");
println!(" 📡 {}", "Services".bold().purple());
for (service, _) in self.services.iter() {
println!(" {} {}", "⍿".green(), service);
}
println!("");
println!(" 🛰 {}", "Executing".bold().purple());
}
}