depl 2.4.3

Toolkit for a bunch of local and remote CI/CD actions
Documentation
//! Observe action.
//!
//! YAML example:
//!
//! ```yaml
//! type: observe
//! cmd: btop
//! ```
//!
//! Observe action is used when you need to disable I/O redirection and
//! interact with running programs.

use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::sync::mpsc;

use crate::actions::UsedAction;
use crate::entities::custom_command::CustomCommand;
use crate::entities::environment::RunEnvironment;
use crate::entities::info::ShortName;
use crate::entities::observe_executor::OwnedEnvironment;
use crate::entities::variables::Variable;

/// Observe action.
///
/// This action type implicitly overrides `no_pipe` run argument for given command.
#[derive(Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct ObserveAction {
  /// Underlying command to execute.
  #[serde(flatten)]
  pub command: CustomCommand,
}

impl ObserveAction {
  /// Sets up an action from terminal.
  pub fn prompt_setup_for_project(
    &self,
    used_action: &UsedAction,
    variables: &BTreeMap<ShortName, Variable>,
  ) -> anyhow::Result<UsedAction> {
    self.command.prompt_setup_for_project(used_action, variables)
  }

  /// Executes observer command either inside `ObserveExecutor` or usual environment.
  pub async fn execute(
    &self,
    env: &RunEnvironment<'_>,
    vars: &BTreeMap<String, Variable>,
  ) -> anyhow::Result<(bool, Vec<String>)> {
    if let Some(cli) = env.observe_cli {
      let (jrtx, jrrx) = mpsc::channel(1);
      let (jftx, mut jfrx) = mpsc::channel(1);
      if let Err(e) = cli.tx.send((jrrx, jftx)).await {
        crate::rw::log(format!("Can't send task exchange channels! Error: {e:?}"));
        return Ok((false, vec![]));
      }

      let owned_env = OwnedEnvironment::new(env.run_dir);
      let observe = self.clone();

      if let Err(e) = jrtx.send((observe, owned_env, vars.clone())).await {
        crate::rw::log(format!("Can't send `observe` task! Error: {e:?}"));
        return Ok((false, vec![]));
      }

      // If restart signal is available, monitor it while waiting for observe action
      if let Some(restart_signal) = &env.restart_requested {
        loop {
          tokio::select! {
            result = jfrx.recv() => {
              return Ok((result.unwrap_or(false), vec![]));
            }
            _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
              if restart_signal.load(std::sync::atomic::Ordering::Relaxed) {
                crate::rw::log("Restart requested during Observe action, interrupting...");
                // Returning early will cause the observe task to be aborted when new pipeline starts
                return Ok((true, vec![]));
              }
            }
          }
        }
      } else {
        Ok((jfrx.recv().await.unwrap_or(false), vec![]))
      }
    } else {
      self.command.execute_observer(env, vars).await
    }
  }

  /// Converts observe action to shell script.
  pub async fn to_shell(
    &self,
    env: &RunEnvironment<'_>,
    vars: &BTreeMap<String, Variable>,
  ) -> anyhow::Result<Vec<String>> {
    self.command.to_shell(env, vars).await
  }
}