use crate::CacheKey;
use crate::CommandArgument;
use crate::CommandKind;
use crate::CommandOutput;
use crate::PathMapper;
use async_recursion::async_recursion;
pub use bstr;
use bstr::BString;
use bstr::ByteSlice;
use chrono::DateTime;
use chrono::Local;
use chrono::TimeDelta;
use cloud_terrastodon_pathing::AppDir;
use cloud_terrastodon_pathing::Existy;
use cloud_terrastodon_relative_location::RelativeLocation;
use eyre::Context;
use eyre::Result;
use eyre::bail;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::ffi::OsString;
use std::future::Future;
use std::panic::Location;
use std::path::Path;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tempfile::Builder;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
use tokio::task::spawn_blocking;
use tokio::time::timeout;
use tracing::Instrument;
use tracing::debug;
use tracing::debug_span;
use tracing::error;
use tracing::info;
use tracing::info_span;
use tracing::warn;
pub trait FromCommandOutput: DeserializeOwned + Send + 'static {}
impl<T> FromCommandOutput for T where T: DeserializeOwned + Send + 'static {}
#[derive(Clone, Copy, Default, Debug)]
pub enum RetryBehaviour {
Fail,
#[default]
Retry,
}
#[derive(Clone, Copy, Default, Debug)]
pub enum OutputBehaviour {
Display,
#[default]
Capture,
}
#[derive(Debug, Default, Clone)]
pub struct CommandBuilder {
pub(crate) kind: CommandKind,
pub(crate) args: Vec<CommandArgument>,
pub(crate) adjacent_files: HashMap<PathBuf, BString>,
pub(crate) env: HashMap<String, String>,
pub(crate) run_dir: Option<PathBuf>,
pub(crate) retry_behaviour: RetryBehaviour,
pub(crate) output_behaviour: OutputBehaviour,
pub(crate) cache_key: Option<CacheKey>,
pub(crate) should_announce: bool,
pub(crate) timeout: Option<Duration>,
pub(crate) stdin_content: Option<String>,
}
static LOGIN_LOCK: OnceCell<Arc<Mutex<()>>> = OnceCell::const_new();
impl CommandBuilder {
pub fn new(kind: CommandKind) -> CommandBuilder {
let mut cmd = CommandBuilder::default();
cmd.use_command(kind);
cmd
}
pub fn use_command(&mut self, kind: CommandKind) -> &mut Self {
self.kind = kind;
self
}
pub async fn bust_cache(&self) -> Result<()> {
let Some(cache_key) = &self.cache_key else {
bail!("no cache entry present");
};
let cache_dir = cache_key.path_on_disk();
let busted_path = cache_dir.join("busted");
let _file = OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(&busted_path)
.await
.context(format!(
"failed creating busted cache indicator at {}",
busted_path.display(),
))?;
Ok(())
}
#[track_caller]
pub fn cache(&mut self, key: CacheKey) -> &mut Self {
self.cache_key = Some(key);
self
}
#[track_caller]
pub fn use_cache(&mut self, key: Option<CacheKey>) -> &mut Self {
self.cache_key = key;
self
}
pub fn use_run_dir(&mut self, dir: impl AsRef<Path>) -> &mut Self {
self.run_dir = Some(dir.as_ref().to_path_buf());
self
}
pub fn use_retry_behaviour(&mut self, behaviour: RetryBehaviour) -> &mut Self {
self.retry_behaviour = behaviour;
self
}
pub fn use_output_behaviour(&mut self, behaviour: OutputBehaviour) -> &mut Self {
self.output_behaviour = behaviour;
self
}
pub fn use_timeout(&mut self, timeout: Duration) -> &mut Self {
self.timeout = Some(timeout);
self
}
pub fn args<I, S>(&mut self, args: I) -> &mut Self
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
for arg in args {
self.arg(arg);
}
self
}
pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Self {
self.args
.push(CommandArgument::Literal(arg.as_ref().to_owned()));
self
}
pub fn adjacent_file<P: Into<PathBuf>, C: Into<BString>>(
&mut self,
path: P,
content: C,
) -> &mut Self {
self.adjacent_files.insert(path.into(), content.into());
self
}
pub fn file_arg<S: AsRef<Path>>(
&mut self,
path: S,
mapper: impl PathMapper,
content: String,
) -> &mut Self {
let path = path.as_ref().to_path_buf();
self.args.push(CommandArgument::DeferredAdjacentFilePath {
key: path.clone(),
mapper: Arc::new(mapper),
});
self.adjacent_files.insert(path, content.into());
self
}
pub fn azure_file_arg<S: AsRef<Path>>(&mut self, path: S, content: String) -> &mut Self {
self.file_arg(
path,
crate::PrefixPathMapper { prefix: "@".into() },
content,
);
self
}
pub fn env(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
self.env
.insert(key.as_ref().to_string(), value.as_ref().to_string());
self
}
pub fn should_announce(&mut self, value: bool) -> &mut Self {
self.should_announce = value;
self
}
pub async fn summarize(&self) -> String {
let mut args = self.args.clone();
if self.kind == CommandKind::AzureCLI {
let has_debug = args
.iter()
.any(|a| matches!(a, CommandArgument::Literal(lit) if lit == "--debug"));
if !has_debug {
args.push(CommandArgument::Literal("--debug".into()));
}
}
let args = args.into_iter().map(OsString::from).collect::<Vec<_>>();
format!(
"{} {}",
self.kind.program().await,
args.join(&OsString::from(" ")).to_string_lossy()
)
}
pub async fn get_cached_output(&self) -> Result<Option<CommandOutput>> {
let start = Instant::now();
let Some(cache_key) = &self.cache_key else {
debug!("Cache behaviour is None, not using cache");
return Ok(None);
};
let valid_for = &cache_key.valid_for;
let cache_dir = cache_key.path_on_disk();
if valid_for.is_zero() {
debug!("Cache validity duration is zero, not using cache");
return Ok(None);
}
if !cache_dir.exists() {
debug!("Cache directory does not exist, not using cache");
return Ok(None);
}
let load_from_pathbuf = async |path: &PathBuf| -> Result<BString> {
let path = cache_dir.join(path);
let mut file = OpenOptions::new()
.read(true)
.open(&path)
.await
.context(format!("opening cache file {}", path.display()))?;
let mut file_contents = Vec::new();
file.read_to_end(&mut file_contents)
.await
.context(format!("reading cache file {}", path.display()))?;
let file_contents = BString::from(file_contents);
Ok(file_contents)
};
let load_from_path = async |path: &str| -> Result<BString> {
let span = debug_span!("Reading command cache from disk");
span.record("path", path);
load_from_pathbuf(&PathBuf::from(path))
.instrument(span.or_current())
.await
};
if !matches!(
tokio::fs::try_exists(cache_dir.join("busted")).await,
Ok(false)
) {
debug!("Cache is busted");
return Ok(None);
}
let expect_files: [(&PathBuf, &BString); 1] = [
(
&PathBuf::from("context.txt"),
&self.summarize().await.into(),
),
];
let mut expect_files = Vec::from_iter(expect_files);
for (adj_path, adj_content) in self.adjacent_files.iter() {
expect_files.push((adj_path, adj_content));
}
for (path, expected_contents) in expect_files {
let file_contents = load_from_pathbuf(path).await?;
if file_contents != *expected_contents {
debug!(
path=%path.display(),
found=%file_contents,
expected=%expected_contents,
"Not using cache due to expected content mismatch. Did Cloud Terrastodon change what command is being called?",
);
return Ok(None);
}
}
let timestamp = load_from_path("timestamp.txt").await?;
let timestamp_first_line = timestamp
.lines()
.next()
.ok_or_else(|| eyre::eyre!("timestamp.txt contained no lines"))?;
let timestamp_first_line = timestamp_first_line
.to_str()
.wrap_err("failed to convert timestamp first line to string")?;
let timestamp = DateTime::parse_from_rfc2822(timestamp_first_line).wrap_err_with(|| {
format!("failed to parse timestamp from '{}'", timestamp_first_line)
})?;
let now = Local::now();
let time_remaining = if *valid_for == Duration::MAX {
TimeDelta::MAX
} else {
timestamp + *valid_for - now.fixed_offset()
};
if time_remaining < TimeDelta::zero() {
debug!(
%timestamp,
valid_for_seconds = valid_for.as_secs(),
expired_for_seconds = time_remaining.abs().num_seconds(),
"Cache entry has expired (was from {}, was valid for {}, expired {} ago)",
timestamp,
humantime::format_duration(*valid_for),
humantime::format_duration(time_remaining.abs().to_std().unwrap()),
);
return Ok(None);
}
let status: i32 = load_from_path("status.txt").await?.to_str()?.parse()?;
let stdout = load_from_path("stdout.json").await?;
let stderr = load_from_path("stderr.json").await?;
let elapsed = Instant::now().duration_since(start);
debug!(
%timestamp,
valid_for_seconds = valid_for.as_secs(),
remaining_seconds = time_remaining.num_seconds(),
cache_load_ms = elapsed.as_millis(),
"Loaded command output from cache in {}",
humantime::format_duration(elapsed),
);
Ok(Some(CommandOutput {
status,
stdout,
stderr,
}))
}
pub async fn write_output(&self, output: &CommandOutput, parent_dir: &PathBuf) -> Result<()> {
debug!(path = %parent_dir.display(), "Writing command results");
parent_dir.ensure_dir_exists().await?;
let summary = self.summarize().await;
let status = output.status.to_string();
let timestamp = &Local::now().to_rfc2822();
let files = [
("context.txt", summary.as_bytes()),
("stdout.json", &output.stdout),
("stderr.json", &output.stderr),
("status.txt", status.as_bytes()),
("timestamp.txt", timestamp.as_bytes()),
];
let busted_path = parent_dir.join("busted");
if let Ok(true) = busted_path.try_exists() {
tokio::fs::remove_file(&busted_path)
.await
.context("Removing busted cache marker")?;
}
for (file_name, file_contents) in files {
let path = parent_dir.join(file_name);
if file_name == "timestamp.txt" {
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(&path)
.await
.context(format!(
"opening file {}",
path.to_string_lossy().into_owned()
))?;
let mut line = file_contents.as_bytes().to_vec();
line.push(b'\n');
file.write_all(&line).await.context(format!(
"writing file {}",
path.to_string_lossy().into_owned()
))?;
} else {
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.await
.context(format!(
"opening file {}",
path.to_string_lossy().into_owned()
))?;
file.write_all(file_contents.as_bytes())
.await
.context(format!(
"writing file {}",
path.to_string_lossy().into_owned()
))?;
}
}
Ok(())
}
pub fn send_stdin(&mut self, content: impl Into<String>) -> &mut Self {
self.stdin_content = Some(content.into());
self
}
async fn run_raw_inner(&self, caller: &'static Location<'static>) -> Result<CommandOutput> {
let mut command = Command::new(self.kind.program().await);
match self.output_behaviour {
OutputBehaviour::Capture => {
command.stdin(Stdio::piped()); command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
}
OutputBehaviour::Display => {
if self.stdin_content.is_some() {
command.stdin(Stdio::piped()); }
}
}
let _temp_files = self
.kind
.apply_args_and_envs(self, &mut command)
.await
.context("applying args and envs")?;
if let Some(ref dir) = self.run_dir {
command.current_dir(dir);
}
if self.should_announce {
info!("Executing command");
} else {
debug!("Executing command");
}
command.kill_on_drop(true);
let mut child = command.spawn().wrap_err("Failed to spawn command")?;
if let Some(content) = &self.stdin_content
&& let Some(mut stdin) = child.stdin.take()
{
let content = content.to_owned();
tokio::spawn(async move {
if let Err(e) = stdin.write_all(content.as_bytes()).await {
error!("Failed to write to stdin: {:?}", e);
}
});
}
let timeout_duration = self.timeout.unwrap_or(Duration::MAX);
let output: CommandOutput = match timeout(timeout_duration, child.wait_with_output()).await
{
Ok(result) => result
.wrap_err("Acquiring result of command execution")?
.try_into()
.wrap_err("Converting output of command")?,
Err(elapsed) => {
bail!(
"Command timeout, {elapsed:?} ({})",
humantime::format_duration(timeout_duration)
);
}
};
if !output.success() {
match self.retry_behaviour {
RetryBehaviour::Retry
if [
"ERROR: Too Many Requests",
"Error: Too Many Requests"
]
.into_iter()
.any(|x| output.stderr.contains_str(x)) =>
{
let mut sleep_duration = Duration::from_secs(30);
let needle = "'x-ms-user-quota-resets-after': '";
if let Some(pos) = output.stderr.find(needle) {
let start = pos + needle.len();
if let Some(end) = output.stderr[start..].find("'") {
let reset_after_str = String::from_utf8_lossy(&output.stderr[start..start + end]);
let parts = reset_after_str.split(':').map(|x| x.parse::<u64>()).collect::<Result<Vec<_>, _>>()?;
sleep_duration = match parts.as_slice() {
[hh, mm, ss] => {
Duration::from_secs(hh * 3600 + mm * 60 + ss) + Duration::from_secs(5)
}
_ => sleep_duration,
};
}
}
warn!("Rate limit detected ⏳ Retrying command after {sleep_duration:?} wait...");
tokio::time::sleep(sleep_duration).await;
info!("It's been {sleep_duration:?}, retrying command `{}`", self.summarize().await);
let mut retry = self.clone();
retry.use_retry_behaviour(RetryBehaviour::Fail);
let output = retry.run_raw_from(caller).await;
return output;
},
RetryBehaviour::Retry
if [
"AADSTS70043",
"No subscription found. Run 'az account set' to select a subscription.",
"Please run 'az login' to setup account.",
"ERROR: (pii). Status: Response_Status.Status_InteractionRequired, Error code: 3399614467",
]
.into_iter()
.any(|x| output.stderr.contains_str(x)) =>
{
if std::env::var("CLOUD_TERRASTODON_REAUTH").unwrap_or_default().to_uppercase() == "DENY" {
bail!("Command failed due to bad auth, and automatic reauthentication is disabled by the CLOUD_TERRASTODON_REAUTH environment variable. Please refresh your credentials and try again.")
}
let mutex = LOGIN_LOCK
.get_or_init(async || Arc::new(Mutex::new(())))
.await;
match mutex.try_lock() {
Ok(x) => {
debug!(
"Acquired login lock without waiting, there isn't a login in progress"
);
warn!(
"Command failed due to bad auth. Refreshing credential, user action required in a moment..."
);
let tenant_id = CommandBuilder::new(CommandKind::AzureCLI)
.args([
"account",
"list",
"--query",
"[?isDefault].tenantId",
"--output",
"tsv",
])
.run_raw_from(caller)
.await?
.stdout;
let tenant_id = tenant_id.trim();
if tenant_id.is_empty() {
warn!(
"Failed to find tenant ID from default account, the login command without tenant ID has been flaky for me .-. trying anyways"
);
CommandBuilder::new(CommandKind::AzureCLI)
.arg("login")
.run_raw_from(caller)
.await?;
} else {
CommandBuilder::new(CommandKind::AzureCLI)
.args([
"login",
"--tenant",
tenant_id
.to_str()
.wrap_err("converting tenant id to str")?,
])
.run_raw_from(caller)
.await?;
}
drop(x);
}
Err(_) => {
debug!("Login lock busy, waiting for the login to complete");
warn!(
"Command failed due to bad auth. Waiting for login in progress..."
);
_ = mutex.lock().await;
}
}
info!("Retrying command with refreshed credential...");
let mut retry = self.clone();
retry.use_retry_behaviour(RetryBehaviour::Fail);
let output = retry.run_raw_from(caller).await;
return output;
}
_ => {
let dir = self.write_failure(&output).await?;
let mut error = Err(eyre::Error::from(output).wrap_err(format!(
"Command did not execute successfully, using retry behaviour {:?}, dumped to {dir:?}",
self.retry_behaviour
)));
if matches!(self.output_behaviour, OutputBehaviour::Display) {
error = error.wrap_err(format!(
"The output behaviour was set to {:?} instead of {:?} so the stdout and stderr are not available in the dump, try scrolling up in your terminal.",
OutputBehaviour::Display,
OutputBehaviour::Capture,
));
}
return error;
}
}
}
if output.success()
&& let Some(cache_key) = &self.cache_key
&& let Err(e) = self.write_output(&output, &cache_key.path_on_disk()).await
{
error!("Encountered problem saving cache: {:?}", e);
}
Ok(output)
}
#[track_caller]
pub fn run_raw(&self) -> impl Future<Output = Result<CommandOutput>> + Send + '_ {
self.run_raw_from(Location::caller())
}
#[async_recursion]
async fn run_raw_from(&self, caller: &'static Location<'static>) -> Result<CommandOutput> {
let summary = self.summarize().await;
let span =
info_span!("command_run_raw", summary, ?self.run_dir, ?self.cache_key).or_current();
async {
match self.get_cached_output().instrument(span.clone()).await {
Ok(None) => {}
Ok(Some(output)) => {
return Ok(output);
}
Err(error) => {
debug!(?self.cache_key, %error, "Cache load failed");
}
}
let start = Instant::now();
let rtn = self.run_raw_inner(caller).instrument(span.clone()).await;
let elapsed = Instant::now().duration_since(start);
debug!(
elapsed_ms = elapsed.as_millis(),
"Command executed in {}",
humantime::format_duration(elapsed),
);
rtn
}
.instrument(span.clone())
.await
.wrap_err(format!(
"Command::run_raw failed, called from {}",
RelativeLocation::from(caller)
))
.wrap_err(format!("Invoking command failed: {summary}",))
}
#[track_caller]
pub fn run<T: FromCommandOutput>(&self) -> impl Future<Output = Result<T>> + Send + '_ {
self.run_from(Location::caller())
}
async fn run_from<T: FromCommandOutput>(
&self,
caller: &'static Location<'static>,
) -> Result<T> {
let summary = self.summarize().await;
let span = info_span!("command_run", summary, ?self.run_dir, ?self.cache_key).or_current();
let output = self
.run_raw_from(caller)
.instrument(span.clone())
.await
.wrap_err(format!(
"Command::run failed, called from {}",
RelativeLocation::from(caller)
))?;
let output = Arc::new(output);
let parse_result = {
let output = Arc::clone(&output);
let span = span.clone();
spawn_blocking(move || {
let _guard = span.enter();
let span2 = info_span!("command_parse_output").or_current();
let _guard2 = span2.enter();
let start = Instant::now();
let stdout = output.stdout.to_str_lossy();
let slice = stdout.as_bytes();
let parse_result = serde_json::from_slice(slice);
let elapsed = Instant::now().duration_since(start);
debug!(
parse_ms = elapsed.as_millis(),
"Parsed command output in {}",
humantime::format_duration(elapsed),
);
parse_result
})
.await?
};
match parse_result {
Ok(results) => Ok(results),
Err(e) => {
let dir = self
.write_failure(&output)
.instrument(span.or_current())
.await?;
Err(eyre::Error::new(e)
.wrap_err(format!(
"Deserialization failed!\n - Command: `{summary}`\n - Called by: \"{}\"\n - Dumped to: {dir:?}\n - Type: {}",
RelativeLocation::from(caller),
std::any::type_name::<T>()
)))
}
}
}
#[track_caller]
pub fn run_with_validator<T, F>(
&self,
validator: F,
) -> impl Future<Output = Result<T>> + Send + '_
where
T: FromCommandOutput,
F: FnOnce(T) -> Result<T> + Send + 'static,
{
self.run_with_validator_from(validator, Location::caller())
}
pub async fn run_with_validator_from<T, F>(
&self,
validator: F,
caller: &'static Location<'static>,
) -> Result<T>
where
T: FromCommandOutput,
F: FnOnce(T) -> Result<T>,
{
let output = self.run_raw().await?;
match serde_json::from_slice(&output.stdout) {
Ok(results) => match validator(results) {
Ok(results) => Ok(results),
Err(e) => {
let dir = self.write_failure(&output).await?;
Err(e).context(format!("Encountered validation error after successful invocation of `{}`\ncalled by \"{}\"\ndumped to {:?}",
self.summarize().await,
RelativeLocation::from(caller),
dir
))
}
},
Err(e) => {
let dir = self.write_failure(&output).await?;
Err(eyre::Error::new(e).wrap_err(format!(
"deserializing `{}` failed\ncalled by \"{}\"\ndumped to {:?}",
self.summarize().await,
RelativeLocation::from(caller),
dir
)))
}
}
}
pub async fn write_failure(&self, output: &CommandOutput) -> Result<PathBuf> {
let dir = match &self.cache_key {
None => AppDir::Commands.join("failed"),
Some(cache_key) => {
let cache_dir = cache_key.path_on_disk();
cache_dir.join("failed")
}
};
dir.ensure_dir_exists().await?;
let dir = Builder::new()
.prefix(Local::now().format("%Y%m%d_%H%M%S_").to_string().as_str())
.tempdir_in(dir)?
.keep();
self.write_output(output, &dir).await?;
for (adj_path, adj_content) in self.adjacent_files.iter() {
let path = dir.join(adj_path);
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.await
.context(format!("Opening arg file {}", path.display()))?;
file.write_all(adj_content.as_bytes())
.await
.context(format!("Writing arg file {}", path.display()))?;
}
Ok(dir)
}
}