use std::collections::HashMap;
use std::io::{BufWriter, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use actix::{Arbiter, System};
use serde::{Deserialize, Serialize};
use crate::communication::{MethodCall, MethodResult};
use crate::method;
use crate::method::{CallableMethod, MethodCallable};
pub trait MethodFactory {
fn build(&self) -> Box<dyn CallableMethod>;
}
impl<F, C> MethodFactory for F
where
C: 'static + MethodCallable,
F: Fn() -> C,
{
fn build(&self) -> Box<dyn CallableMethod> {
Box::new((self)())
}
}
pub type Application = Arc<InnerApplication>;
#[derive(Default)]
pub struct InnerApplication {
methods: HashMap<String, Box<dyn MethodFactory>>,
}
#[derive(Default)]
pub struct ApplicationBuilder(InnerApplication);
impl ApplicationBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with(&mut self, method_factory: Box<dyn MethodFactory>) {
let method = method_factory.build();
println!("Registering method {}", method.name());
self.0.methods.insert(method.name(), method_factory);
}
pub fn run(self, server_address: String) -> method::Result<()> {
println!("Starting application");
let application = Arc::new(self.0);
let sys = System::new("application");
let mut arbiter = Arbiter::new();
let thread = std::thread::spawn(move || {
InnerApplication::start_server(application, &mut arbiter, server_address)
.unwrap_or_default()
});
println!("Running application system");
sys.run()?;
println!("Application system stopped");
thread.join()?;
println!("Application server stopped");
Ok(())
}
}
impl InnerApplication {
pub fn call(&self, context: MethodCall) -> MethodResult {
if let Some(factory) = self.methods.get(&context.method_name) {
let mut method = factory.build();
return method.call(context);
}
MethodResult {
code: 1,
message: json!({
"error": "Could not find requested method"
}),
}
}
fn start_server(
application: Application,
arbiter: &mut Arbiter,
server_address: String,
) -> method::Result<()> {
println!("Running application server");
let server = TcpListener::bind(server_address)?;
loop {
if let Ok((client, _addr)) = server.accept() {
arbiter.send(Box::pin(InnerApplication::handle_client(
application.clone(),
client,
)));
}
}
}
async fn handle_client(application: Application, client: TcpStream) {
let read_client = match client.try_clone() {
Ok(client) => client,
_ => return,
};
let mut deserializer = serde_json::Deserializer::from_reader(read_client);
let context = MethodCall::deserialize(&mut deserializer).unwrap_or_default();
let result = application.call(context);
let writer = BufWriter::new(client);
let mut serializer = serde_json::Serializer::new(writer);
result.serialize(&mut serializer).unwrap_or_default();
let mut writer = serializer.into_inner();
writer.write_all("\n".as_bytes()).unwrap_or_default();
writer.flush().unwrap_or_default();
}
}
unsafe impl Send for InnerApplication {}
unsafe impl Sync for InnerApplication {}