use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::collections::HashMap;
use common::prelude::*;
use common::state::State;
use common::config::{Config, HttpConfig};
use scripts::{Blueprint, Repository, JobContext};
use processor::{Processor, ProcessorApi};
use web::WebApp;
struct InnerApp {
locked: bool,
scripts_blueprint: Blueprint,
processor: Processor<Repository>,
http: Option<WebApp<ProcessorApi<Repository>>>,
}
impl InnerApp {
fn new() -> Result<Self> {
let state = Arc::new(State::new());
let blueprint = Blueprint::new(state.clone());
let processor = Processor::new(
0,
Arc::new(blueprint.repository()),
JobContext::default(),
state.clone(),
)?;
Ok(InnerApp {
locked: false,
scripts_blueprint: blueprint,
http: None,
processor,
})
}
fn restart_http_server(&mut self, config: &HttpConfig) -> Result<()> {
if let Some(http) = self.http.take() {
http.stop();
}
let http = WebApp::new(
Arc::new(self.scripts_blueprint.repository()),
config,
self.processor.api(),
)?;
if self.locked {
http.lock();
}
self.http = Some(http);
Ok(())
}
fn set_scripts_path<P: AsRef<Path>>(
&mut self, path: P, recursive: bool,
) -> Result<()> {
self.scripts_blueprint.clear();
self.scripts_blueprint.collect_path(path, recursive)?;
self.processor.api().cleanup()?;
Ok(())
}
fn set_job_environment(&self, env: HashMap<String, String>) -> Result<()> {
self.processor.api().update_context(JobContext {
environment: env,
.. JobContext::default()
})?;
Ok(())
}
fn set_threads_count(&self, count: u16) -> Result<()> {
self.processor.api().set_threads_count(count)?;
Ok(())
}
fn http_addr(&self) -> Option<&SocketAddr> {
if let Some(ref http) = self.http {
Some(http.addr())
} else {
None
}
}
fn lock(&mut self) -> Result<()> {
if let Some(ref http) = self.http {
http.lock();
}
self.processor.api().lock()?;
self.locked = true;
Ok(())
}
fn unlock(&mut self) -> Result<()> {
self.processor.api().unlock()?;
if let Some(ref http) = self.http {
http.unlock();
}
self.locked = false;
Ok(())
}
fn stop(mut self) -> Result<()> {
if let Some(ref http) = self.http {
http.lock();
}
self.processor.stop()?;
if let Some(http) = self.http.take() {
http.stop();
}
Ok(())
}
}
pub struct Fisher {
config: Config,
inner: InnerApp,
}
impl Fisher {
pub fn new(config: Config) -> Result<Self> {
let mut inner = InnerApp::new()?;
inner.set_scripts_path(
&config.scripts.path, config.scripts.recursive,
)?;
inner.set_job_environment(config.env.clone())?;
inner.set_threads_count(config.jobs.threads)?;
inner.restart_http_server(&config.http)?;
Ok(Fisher {
config,
inner,
})
}
pub fn web_address(&self) -> Option<&SocketAddr> {
self.inner.http_addr()
}
pub fn reload(&mut self, new_config: Config) -> Result<()> {
self.inner.lock()?;
let result = self.reload_inner(new_config);
self.inner.unlock()?;
result
}
fn reload_inner(&mut self, new_config: Config) -> Result<()> {
if self.config.http != new_config.http {
self.inner.restart_http_server(&new_config.http)?;
}
if self.config.env != new_config.env {
self.inner.set_job_environment(new_config.env.clone())?;
}
if self.config.jobs.threads != new_config.jobs.threads {
self.inner.set_threads_count(new_config.jobs.threads)?;
}
self.inner.set_scripts_path(
&new_config.scripts.path,
new_config.scripts.recursive,
)?;
self.config = new_config;
Ok(())
}
pub fn stop(self) -> Result<()> {
self.inner.stop()
}
}