depl 2.4.3

Toolkit for a bunch of local and remote CI/CD actions
Documentation
//! Observe Actions executor module.

#![allow(clippy::type_complexity)]

use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::CTRLC_HANDLER;
use crate::actions::observe::ObserveAction;
use crate::entities::driver::PipelineDriver;
use crate::entities::environment::RunEnvironment;
use crate::entities::info::ShortName;
use crate::entities::remote_host::RemoteHost;
use crate::entities::variables::Variable;

/// Small owned run environment to run only `Observe` Actions.
pub struct OwnedEnvironment {
  /// Actual run folder. This is where Deployer runs all the Actions.
  pub run_dir: PathBuf,

  _empty_path: PathBuf,
  _empty_str: String,
  _empty_remotes: BTreeMap<ShortName, RemoteHost>,
  _empty_ignore: BTreeSet<PathBuf>,
}

impl OwnedEnvironment {
  /// Creates new owned run environment.
  pub fn new(run_dir: &Path) -> Self {
    Self {
      run_dir: run_dir.to_path_buf(),

      _empty_path: PathBuf::new(),
      _empty_str: String::new(),
      _empty_remotes: BTreeMap::new(),
      _empty_ignore: BTreeSet::new(),
    }
  }

  /// Converts self to run environment.
  pub fn to_env(&self) -> RunEnvironment<'_> {
    RunEnvironment {
      run_dir: &self.run_dir,

      config_dir: &self._empty_path,
      new_build: false,
      cache_dir: &self._empty_path,
      project_dir: None,
      storage_dir: &self._empty_path,
      artifacts_dir: &self._empty_path,
      artifacts_placements: &[],
      remotes: &self._empty_remotes,
      ignore: &self._empty_ignore,
      log_file: None,
      containered_build: false,
      containered_run: false,
      master_pipeline: &self._empty_str,
      ansible_run: false,
      daemons: Default::default(),
      observe_cli: &None,
      skipper: Default::default(),
      driver: PipelineDriver::Deployer,
      restart_requested: None,
    }
  }
}

/// External executor of `Observe` Actions.
pub struct ObserveExecutor {
  /// When a new Pipeline plugs in, `ObserveExecutor` must switch to its job receiver and provided result sender.
  pub pipeline_listener: mpsc::Receiver<(
    mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
    mpsc::Sender<bool>,
  )>,
  /// This is sender for Pipelines. Used by cloning and sending inside `ObserveClient`.
  pub pipeline_client: mpsc::Sender<(
    mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
    mpsc::Sender<bool>,
  )>,

  /// Early exit sender. Used by sending signals to quit inside `ObserveClient::run`.
  pub early_exit_tx: mpsc::Sender<()>,
  /// Early exit receiver.
  pub early_exit_rx: mpsc::Receiver<()>,
}

/// Simple shell for a task exchange channels pair. By using this client, user should provide job receiver and result sender.
#[derive(Clone)]
pub struct ObserveClient {
  /// Task exchange channels pair' sender.
  pub tx: mpsc::Sender<(
    mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
    mpsc::Sender<bool>,
  )>,
}

impl Default for ObserveExecutor {
  /// Creates a new executor.
  fn default() -> Self {
    let (tx, rx) = mpsc::channel(1);
    let (eetx, eerx) = mpsc::channel(1);
    Self {
      pipeline_listener: rx,
      pipeline_client: tx,
      early_exit_tx: eetx,
      early_exit_rx: eerx,
    }
  }
}

impl ObserveExecutor {
  /// Creates `ObserveClient` for a new Pipeline run.
  pub fn make_client(&self) -> ObserveClient {
    ObserveClient {
      tx: self.pipeline_client.clone(),
    }
  }

  /// Runs executor's logic.
  ///
  /// This method eventually moves `ObserveExecutor` and blocks current thread, so use it with `tokio::task::spawn`.
  pub async fn run(mut self) -> anyhow::Result<()> {
    let mut curr_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
    let mut curr_pair: Option<(
      mpsc::Receiver<(ObserveAction, OwnedEnvironment, BTreeMap<String, Variable>)>,
      mpsc::Sender<bool>,
    )> = None;

    loop {
      if let Some((mut jrrx, jftx)) = curr_pair.take() {
        tokio::select! {
          // handle new observer task
          j = jrrx.recv() => {
            crate::rw::log("\tOE: New observer task");
            let (job, owned_env, vars) = if let Some((job, owned_env, vars)) = j {
              (job, owned_env, vars)
            } else {
              // channel closed, need to exit
              continue
            };

            if let Some(handle) = curr_handle.take() {
              handle.abort();
              let mut guard = CTRLC_HANDLER.lock().await;
              if let Some(child) = guard.as_mut() {
                if let Err(e) = child.kill() {
                  crate::rw::log(format!("Error on observer kill: {e:?}"));
                }
                *guard = None;
              }
              crate::rw::log("\tOE: Handle aborted");
            }
            curr_handle = Some(tokio::task::spawn({
              let jftx = jftx.clone();
              async move {
                let env = owned_env.to_env();
                let (res, _) = job.command.execute_observer(&env, &vars).await?;
                let _ = jftx.send(res).await;
                Ok(())
              }
            }));
            crate::rw::log("\tOE: New task spawned");
            // return pair into state, because each pipeline may have several observe actions
            curr_pair = Some((jrrx, jftx));
          }
          // handle new task exchange pair
          pair = self.pipeline_listener.recv() => {
            if let Some((new_jrrx, new_jftx)) = pair {
              crate::rw::log("\tOE: New task exchange channel received");
              curr_pair = Some((new_jrrx, new_jftx));
            } else {
              curr_pair = Some((jrrx, jftx));
            }
          }
          // handle early exit
          _ = self.early_exit_rx.recv() => {
            crate::rw::log("\tOE: Early exit received");
            if let Some(handle) = curr_handle.take() {
              handle.abort();
              let mut guard = CTRLC_HANDLER.lock().await;
              if let Some(child) = guard.as_mut() {
                if let Err(e) = child.kill() {
                  crate::rw::log(format!("Error on observer kill: {e:?}"));
                }
                *guard = None;
              }
            }
            crate::rw::log("\tOE: Handle aborted");
            return Ok(())
          }
        }
      } else {
        tokio::select! {
          // handle new task exchange pair
          pair = self.pipeline_listener.recv() => {
            if let Some((new_jrrx, new_jftx)) = pair {
              crate::rw::log("\tOE: New task exchange channel received");
              curr_pair = Some((new_jrrx, new_jftx));
            }
          }
          // handle early exit
          _ = self.early_exit_rx.recv() => {
            crate::rw::log("\tOE: Early exit received");
            if let Some(handle) = curr_handle.take() {
              handle.abort();
              let mut guard = CTRLC_HANDLER.lock().await;
              if let Some(child) = guard.as_mut() {
                if let Err(e) = child.kill() {
                  crate::rw::log(format!("Error on observer kill: {e:?}"));
                }
                *guard = None;
              }
            }
            crate::rw::log("\tOE: Handle aborted");
            return Ok(())
          }
        }
      }
    }
  }
}