mod converter;
mod distributor;
mod host;
mod merger;
mod painter;
pub mod moderator;
use std::{
sync::{
mpsc::{sync_channel, SyncSender},
Arc,
},
thread::{spawn, JoinHandle},
};
pub use host::Host;
use crate::{
image_handler::{Command, ImageConfig},
Client,
};
use crate::service::moderator::Server;
pub struct ServiceBuilder {
host: Host,
threads: usize,
image_config: ImageConfig,
converter_threads: usize,
channel_limit: usize,
listen_port: Option<u16>,
}
impl ServiceBuilder {
pub fn new(host: Host) -> ServiceBuilder {
ServiceBuilder {
host,
threads: 10,
image_config: ImageConfig::default(),
converter_threads: 1,
channel_limit: 10,
listen_port: None,
}
}
pub fn new_from_host_str(host: &str) -> ServiceBuilder {
ServiceBuilder::new(Host::new(host, None).unwrap())
}
pub fn threads(mut self, threads: usize) -> ServiceBuilder {
self.threads = threads;
self
}
pub fn image_config(mut self, image_config: ImageConfig) -> ServiceBuilder {
self.image_config = image_config;
self
}
pub fn converter_threads(mut self, converter_threads: usize) -> ServiceBuilder {
self.converter_threads = converter_threads;
self
}
pub fn channel_limit(mut self, channel_limit: usize) -> ServiceBuilder {
self.channel_limit = channel_limit;
self
}
pub fn listen_port(mut self, port: u16) -> ServiceBuilder {
self.listen_port = Some(port);
self
}
pub fn build(self) -> Service {
Service::new(
self.host,
self.threads,
self.image_config,
self.converter_threads,
self.channel_limit,
self.listen_port,
)
}
}
pub struct Service {
host: Host,
threads: usize,
image_config: ImageConfig,
worker_client: Option<Client>,
converter_threads: usize,
channel_limit: usize,
listen_port: Option<u16>,
converter_input: Option<SyncSender<distributor::DistributorChange>>,
painter_input: Option<SyncSender<Arc<Command>>>,
join_handles: Vec<JoinHandle<()>>,
}
impl Service {
pub fn new(
host: Host,
threads: usize,
image_config: ImageConfig,
converter_threads: usize,
channel_limit: usize,
listen_port: Option<u16>,
) -> Service {
Service {
host,
threads,
image_config,
worker_client: None,
converter_threads,
channel_limit,
listen_port,
converter_input: None,
painter_input: None,
join_handles: Vec::new(),
}
}
pub fn start(&mut self) {
if self.painter_input.is_some() {
panic!("Can not start Service twice!")
}
let (painter_input, painter_output) = sync_channel(self.channel_limit);
self.painter_input = Some(painter_input.clone());
if self.converter_threads > 0 {
let (merger_input, merger_output) = sync_channel(self.channel_limit);
let mut distributor_output = Vec::new();
for _ in 0..self.converter_threads {
let (converter_input, converter_output) = sync_channel(self.channel_limit);
self.join_handles.push(spawn(converter::get_converter(
self.image_config,
converter_output,
merger_input.clone(),
)));
distributor_output.push(converter_input);
}
let (converter_input, converter_output) = sync_channel(self.channel_limit);
self.converter_input = Some(converter_input);
self.join_handles
.push(spawn(distributor::get_converter_distributor(
converter_output,
distributor_output,
)));
self.join_handles.push(spawn(merger::get_merger(
merger_output,
painter_input.clone(),
)));
}
if let Some(port) = self.listen_port {
let server = Server::new(port, self.host.clone(), self.threads, painter_output);
self.join_handles.push(spawn(move|| {
server.listen()
}))
} else {
let mut painter_inputs = Vec::new();
for i in 0..self.threads {
let (painter_input, painter_output) = sync_channel(self.channel_limit);
painter_inputs.push(painter_input);
self.join_handles.push(spawn(painter::get_painter(
painter_output,
self.host.clone(),
i,
self.threads,
)));
}
self.join_handles
.push(spawn(distributor::get_painter_distributor(
painter_output,
painter_inputs,
)));
}
}
fn start_check(&self) {
if self.painter_input.is_none() {
panic!("Service not started!")
}
}
pub fn change_image_config(&mut self, image_config: ImageConfig) {
self.image_config = image_config;
if let Some(converter_input) = &self.converter_input {
let _ = converter_input.send(distributor::DistributorChange::Config(image_config));
}
}
pub fn send_image(&self, image: image::DynamicImage) {
self.start_check();
if let Some(converter_input) = &self.converter_input {
let _ = converter_input.try_send(distributor::DistributorChange::Image(image));
} else {
panic!("Cannot send image without converter threads!")
}
}
pub fn send_command(&self, command: Arc<Command>) {
self.start_check();
let _ = self.painter_input.as_ref().unwrap().send(command);
}
pub fn join(&mut self) {
for handle in self.join_handles.drain(..) {
let _ = handle.join();
}
}
pub fn stop(&mut self) {
self.converter_input = None;
self.painter_input = None;
self.join();
}
pub fn get_client(&mut self) -> std::io::Result<&mut Client> {
if let Some(client) = &self.worker_client {
if client.is_error() {
let _ = client.shutdown();
self.worker_client = None;
}
}
if self.worker_client.is_none() {
self.worker_client = Some(Client::new(self.host.new_stream()?));
}
Ok(self.worker_client.as_mut().unwrap())
}
pub fn loop_callback<F>(&mut self, mut callback: F)
where
F: FnMut(&mut Service),
{
if self.painter_input.is_none() {
self.start();
}
loop {
callback(self);
if self.painter_input.is_none() {
break;
}
}
}
}