#![allow(clippy::type_complexity)]
use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::CTRLC_HANDLER;
use crate::actions::observe::ObserveAction;
use crate::entities::driver::PipelineDriver;
use crate::entities::environment::RunEnvironment;
use crate::entities::info::ShortName;
use crate::entities::remote_host::RemoteHost;
use crate::entities::variables::Variable;
pub struct OwnedEnvironment {
pub run_dir: PathBuf,
_empty_path: PathBuf,
_empty_str: String,
_empty_remotes: BTreeMap<ShortName, RemoteHost>,
_empty_ignore: BTreeSet<PathBuf>,
}
impl OwnedEnvironment {
pub fn new(run_dir: &Path) -> Self {
Self {
run_dir: run_dir.to_path_buf(),
_empty_path: PathBuf::new(),
_empty_str: String::new(),
_empty_remotes: BTreeMap::new(),
_empty_ignore: BTreeSet::new(),
}
}
pub fn to_env(&self) -> RunEnvironment<'_> {
RunEnvironment {
run_dir: &self.run_dir,
config_dir: &self._empty_path,
new_build: false,
cache_dir: &self._empty_path,
project_dir: None,
storage_dir: &self._empty_path,
artifacts_dir: &self._empty_path,
artifacts_placements: &[],
remotes: &self._empty_remotes,
ignore: &self._empty_ignore,
log_file: None,
containered_build: false,
containered_run: false,
master_pipeline: &self._empty_str,
ansible_run: false,
daemons: Default::default(),
observe_cli: &None,
skipper: Default::default(),
driver: PipelineDriver::Deployer,
restart_requested: None,
}
}
}
pub struct ObserveExecutor {
pub pipeline_listener: mpsc::Receiver<(
mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
mpsc::Sender<bool>,
)>,
pub pipeline_client: mpsc::Sender<(
mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
mpsc::Sender<bool>,
)>,
pub early_exit_tx: mpsc::Sender<()>,
pub early_exit_rx: mpsc::Receiver<()>,
}
#[derive(Clone)]
pub struct ObserveClient {
pub tx: mpsc::Sender<(
mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
mpsc::Sender<bool>,
)>,
}
impl Default for ObserveExecutor {
fn default() -> Self {
let (tx, rx) = mpsc::channel(1);
let (eetx, eerx) = mpsc::channel(1);
Self {
pipeline_listener: rx,
pipeline_client: tx,
early_exit_tx: eetx,
early_exit_rx: eerx,
}
}
}
impl ObserveExecutor {
pub fn make_client(&self) -> ObserveClient {
ObserveClient {
tx: self.pipeline_client.clone(),
}
}
pub async fn run(mut self) -> anyhow::Result<()> {
let mut curr_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
let mut curr_pair: Option<(
mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
mpsc::Sender<bool>,
)> = None;
loop {
if let Some((mut jrrx, jftx)) = curr_pair.take() {
tokio::select! {
j = jrrx.recv() => {
crate::rw::log("\tOE: New observer task");
let (job, owned_env, vars) = if let Some((job, owned_env, vars)) = j {
(job, owned_env, vars)
} else {
continue
};
if let Some(handle) = curr_handle.take() {
handle.abort();
let mut guard = CTRLC_HANDLER.lock().await;
if let Some(child) = guard.as_mut() {
if let Err(e) = child.kill() {
crate::rw::log(format!("Error on observer kill: {e:?}"));
}
*guard = None;
}
crate::rw::log("\tOE: Handle aborted");
}
curr_handle = Some(tokio::task::spawn({
let jftx = jftx.clone();
async move {
let env = owned_env.to_env();
let (res, _) = job.command.execute_observer(&env, &vars).await?;
let _ = jftx.send(res).await;
Ok(())
}
}));
crate::rw::log("\tOE: New task spawned");
curr_pair = Some((jrrx, jftx));
}
pair = self.pipeline_listener.recv() => {
if let Some((new_jrrx, new_jftx)) = pair {
crate::rw::log("\tOE: New task exchange channel received");
curr_pair = Some((new_jrrx, new_jftx));
} else {
curr_pair = Some((jrrx, jftx));
}
}
_ = self.early_exit_rx.recv() => {
crate::rw::log("\tOE: Early exit received");
if let Some(handle) = curr_handle.take() {
handle.abort();
let mut guard = CTRLC_HANDLER.lock().await;
if let Some(child) = guard.as_mut() {
if let Err(e) = child.kill() {
crate::rw::log(format!("Error on observer kill: {e:?}"));
}
*guard = None;
}
}
crate::rw::log("\tOE: Handle aborted");
return Ok(())
}
}
} else {
tokio::select! {
pair = self.pipeline_listener.recv() => {
if let Some((new_jrrx, new_jftx)) = pair {
crate::rw::log("\tOE: New task exchange channel received");
curr_pair = Some((new_jrrx, new_jftx));
}
}
_ = self.early_exit_rx.recv() => {
crate::rw::log("\tOE: Early exit received");
if let Some(handle) = curr_handle.take() {
handle.abort();
let mut guard = CTRLC_HANDLER.lock().await;
if let Some(child) = guard.as_mut() {
if let Err(e) = child.kill() {
crate::rw::log(format!("Error on observer kill: {e:?}"));
}
*guard = None;
}
}
crate::rw::log("\tOE: Handle aborted");
return Ok(())
}
}
}
}
}
}