use std::collections::HashMap;
use std::io::{Error, Read as IoRead, Result};
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use run_script::ScriptOptions;
use wait_timeout::ChildExt;
use clokwerk::{Scheduler, TimeUnits};
use git2::Repository;
use crate::git::{self, CommitMetadata};
#[derive(Debug, Clone)]
pub struct Repo {
url: String,
username: Option<String>,
token: Option<String>,
local_path: Option<String>,
branch: String,
command: Option<String>,
delay: u16,
verbosity: u8,
exec_on_start: bool,
exit_on_first_diff: bool,
timeout: u64,
}
impl Repo {
pub fn builder(url: impl Into<String>) -> RepoBuilder {
RepoBuilder::new(url)
}
pub fn url(&self) -> &str {
&self.url
}
pub fn set_url(&mut self, url: String) {
self.url = url;
}
pub fn username(&self) -> Option<&str> {
self.username.as_deref()
}
pub fn token(&self) -> Option<&str> {
self.token.as_deref()
}
pub fn local_path(&self) -> Option<&str> {
self.local_path.as_deref()
}
pub fn set_local_path(&mut self, path: String) {
self.local_path = Some(path);
}
pub fn branch(&self) -> &str {
&self.branch
}
pub fn command(&self) -> Option<&str> {
self.command.as_deref()
}
#[allow(dead_code)]
pub fn delay(&self) -> u16 {
self.delay
}
pub fn verbosity(&self) -> u8 {
self.verbosity
}
#[allow(dead_code)]
pub fn exec_on_start(&self) -> bool {
self.exec_on_start
}
pub fn exit_on_first_diff(&self) -> bool {
self.exit_on_first_diff
}
pub fn timeout(&self) -> u64 {
self.timeout
}
}
#[derive(Debug, Clone)]
pub struct RepoBuilder {
url: String,
username: Option<String>,
token: Option<String>,
local_path: Option<String>,
branch: String,
command: Option<String>,
delay: u16,
verbosity: u8,
exec_on_start: bool,
exit_on_first_diff: bool,
timeout: u64,
}
impl RepoBuilder {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
username: None,
token: None,
local_path: None,
branch: String::from("main"),
command: None,
delay: 120,
verbosity: 1,
exec_on_start: false,
exit_on_first_diff: false,
timeout: 0,
}
}
pub fn username(mut self, username: impl Into<String>) -> Self {
let u = username.into();
self.username = if u.is_empty() { None } else { Some(u) };
self
}
pub fn token(mut self, token: impl Into<String>) -> Self {
let t = token.into();
self.token = if t.is_empty() { None } else { Some(t) };
self
}
pub fn local_path(mut self, path: impl Into<String>) -> Self {
self.local_path = Some(path.into());
self
}
pub fn branch(mut self, branch: impl Into<String>) -> Self {
self.branch = branch.into();
self
}
pub fn command(mut self, command: impl Into<String>) -> Self {
let c = command.into();
self.command = if c.is_empty() { None } else { Some(c) };
self
}
pub fn delay(mut self, delay: u16) -> Self {
self.delay = delay;
self
}
pub fn verbosity(mut self, verbosity: u8) -> Self {
self.verbosity = verbosity;
self
}
pub fn exec_on_start(mut self, exec: bool) -> Self {
self.exec_on_start = exec;
self
}
pub fn exit_on_first_diff(mut self, exit: bool) -> Self {
self.exit_on_first_diff = exit;
self
}
pub fn timeout(mut self, timeout: u64) -> Self {
self.timeout = timeout;
self
}
pub fn build(self) -> Repo {
Repo {
url: self.url,
username: self.username,
token: self.token,
local_path: self.local_path,
branch: self.branch,
command: self.command,
delay: self.delay,
verbosity: self.verbosity,
exec_on_start: self.exec_on_start,
exit_on_first_diff: self.exit_on_first_diff,
timeout: self.timeout,
}
}
}
impl Repo {
pub fn clone_repo(&self) -> Result<()> {
let local_path = self
.local_path
.as_ref()
.ok_or_else(|| Error::other("local_path is not set"))?;
let local_target = str::replace(local_path, "//", "/");
match Repository::clone(&self.url, local_target) {
Ok(_repo) => {
if self.verbosity > 0 {
info!("cloned remote repo to {}", local_path);
}
Ok(())
}
Err(e) => {
let msg = format!("goa error: failed to clone -> {}", e);
Err(Error::other(msg))
}
}
}
pub fn spy_for_changes(&self) -> Result<()> {
if self.verbosity > 0 {
info!("checking for diffs every {} seconds", self.delay);
}
let mut scheduler = Scheduler::new();
let delay = self.delay as u32;
let cloned_repo = Arc::new(Mutex::new(self.clone()));
if self.exec_on_start {
let repo_guard = cloned_repo
.lock()
.map_err(|e| Error::other(format!("Failed to acquire lock: {}", e)))?;
match do_process_once(&repo_guard) {
Ok(()) => {
if self.verbosity > 0 {
info!("exec on startup complete");
}
}
Err(e) => {
return Err(Error::other(format!("failed to exec on startup: {}", e)));
}
}
}
scheduler.every(delay.seconds()).run(move || {
match cloned_repo.lock() {
Ok(repo_guard) => {
if let Err(e) = do_process(&repo_guard) {
eprintln!("goa error: unable to process repo: {}", e);
}
}
Err(e) => {
eprintln!("goa error: failed to acquire lock: {}", e);
}
}
});
loop {
scheduler.run_pending();
thread::sleep(Duration::from_millis(10));
}
}
}
pub fn read_goa_file(goa_path: &str) -> String {
if std::path::Path::new(goa_path).exists() {
std::fs::read_to_string(goa_path).unwrap_or_else(|_| {
String::from("echo 'failed to read .goa file'")
})
} else {
String::from("echo 'no goa file found yet'")
}
}
pub fn execute_command(
command: &str,
working_dir: &str,
env_vars: Option<HashMap<String, String>>,
timeout: u64,
verbosity: u8,
) -> Result<String> {
if verbosity > 1 {
info!("running -> {}", command);
if timeout > 0 {
info!("timeout -> {} seconds", timeout);
}
}
if verbosity > 2 {
debug!("path -> {}", working_dir);
}
if timeout > 0 {
return execute_command_with_timeout(command, working_dir, env_vars, timeout, verbosity);
}
let mut options = ScriptOptions::new();
options.working_directory = Some(PathBuf::from(working_dir));
options.env_vars = env_vars;
let args = vec![];
let (code, output, error) = run_script::run(command, &args, &options)
.map_err(|e| Error::other(format!("Failed to run script: {}", e)))?;
if verbosity > 1 {
info!("command status: {}", code);
if !error.is_empty() {
info!("command stderr:\n{}", error);
}
}
if !error.is_empty() {
eprintln!("{}", error);
}
if code != 0 {
return Err(Error::other(format!("Command exited with code {}", code)));
}
Ok(output)
}
fn execute_command_with_timeout(
command: &str,
working_dir: &str,
env_vars: Option<HashMap<String, String>>,
timeout: u64,
verbosity: u8,
) -> Result<String> {
let timeout_duration = Duration::from_secs(timeout);
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg(command)
.current_dir(working_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(vars) = env_vars {
for (key, value) in vars {
cmd.env(key, value);
}
}
let mut child = cmd
.spawn()
.map_err(|e| Error::other(format!("Failed to spawn command: {}", e)))?;
match child.wait_timeout(timeout_duration) {
Ok(Some(status)) => {
let mut stdout = String::new();
let mut stderr = String::new();
if let Some(mut out) = child.stdout.take() {
out.read_to_string(&mut stdout)
.map_err(|e| Error::other(format!("Failed to read stdout: {}", e)))?;
}
if let Some(mut err) = child.stderr.take() {
err.read_to_string(&mut stderr)
.map_err(|e| Error::other(format!("Failed to read stderr: {}", e)))?;
}
let code = status.code().unwrap_or(-1);
if verbosity > 1 {
info!("command status: {}", code);
if !stderr.is_empty() {
info!("command stderr:\n{}", stderr);
}
}
if !stderr.is_empty() {
eprintln!("{}", stderr);
}
if code != 0 {
return Err(Error::other(format!("Command exited with code {}", code)));
}
Ok(stdout)
}
Ok(None) => {
if verbosity > 0 {
warn!("Command timed out after {} seconds, killing process", timeout);
}
child
.kill()
.map_err(|e| Error::other(format!("Failed to kill timed-out process: {}", e)))?;
child.wait().ok();
Err(Error::other(format!(
"Command timed out after {} seconds",
timeout
)))
}
Err(e) => Err(Error::other(format!("Failed to wait on command: {}", e))),
}
}
fn get_effective_command(repo: &Repo, local_path: &str) -> String {
match repo.command() {
Some(cmd) => cmd.to_string(),
None => read_goa_file(&format!("{}/.goa", local_path)),
}
}
pub fn do_process_once(repo: &Repo) -> Result<()> {
let local_path = repo
.local_path()
.ok_or_else(|| Error::other("local_path is not set"))?;
let local_repo = Repository::open(local_path).map_err(|e| {
Error::other(format!("goa error: failed to open the cloned repo: {}", e))
})?;
let metadata = git::get_last_commit_metadata(&local_repo, repo.branch(), repo.verbosity())
.map_err(|e| Error::other(format!("branch '{}' not found: {}", repo.branch(), e)))?;
let effective_command = get_effective_command(repo, local_path);
if repo.verbosity() > 2 {
debug!("effective command: {}", effective_command);
}
match do_task(repo, &effective_command, Some(&metadata)) {
Ok(output) => {
if repo.verbosity() > 0 {
info!("command stdout: {}", output);
} else {
println!("{output}");
}
}
Err(e) => {
eprintln!("goa error: do_task error {}", e);
}
}
Ok(())
}
pub fn do_process(repo: &Repo) -> Result<()> {
let local_path = repo
.local_path()
.ok_or_else(|| Error::other("local_path is not set"))?;
let local_repo = Repository::open(local_path).map_err(|e| {
Error::other(format!("goa error: failed to open the cloned repo: {}", e))
})?;
if repo.verbosity() > 1 {
info!("checking for diffs at origin/{}!", repo.branch());
}
match git::is_diff(&local_repo, "origin", repo.branch(), repo.verbosity()) {
Ok(commit) => {
match git::do_merge(&local_repo, repo.branch(), commit, repo.verbosity()) {
Ok(metadata) => {
let effective_command = get_effective_command(repo, local_path);
if repo.verbosity() > 2 {
debug!("effective command: {}", effective_command);
}
match do_task(repo, &effective_command, Some(&metadata)) {
Ok(output) => {
if repo.verbosity() > 0 {
info!("command stdout: {}", output);
} else {
println!("{output}");
}
if repo.exit_on_first_diff() {
std::process::exit(0);
}
}
Err(e) => {
eprintln!("goa error: do_task error {}", e);
}
}
}
Err(e) => {
eprintln!("goa error: do_merge error {}", e);
}
}
}
Err(e) => {
if repo.verbosity() > 1 {
debug!("{}", e);
}
}
}
Ok(())
}
fn do_task(repo: &Repo, command: &str, metadata: Option<&CommitMetadata>) -> Result<String> {
let local_path = repo
.local_path()
.ok_or_else(|| Error::other("local_path is not set"))?;
if repo.verbosity() > 1 {
info!("running -> {}", command);
if repo.timeout() > 0 {
info!("timeout -> {} seconds", repo.timeout());
}
}
if repo.verbosity() > 2 {
debug!("path -> {}", local_path);
}
if repo.timeout() > 0 {
return do_task_with_timeout(repo, command, metadata, local_path);
}
let mut options = ScriptOptions::new();
options.working_directory = Some(PathBuf::from(local_path));
if let Some(meta) = metadata {
options.env_vars = Some(meta.to_env_vars());
}
let args = vec![];
let (code, output, error) = run_script::run(command, &args, &options)
.map_err(|e| Error::other(format!("Failed to run script: {}", e)))?;
if repo.verbosity() > 1 {
info!("command status: {}", code);
info!("command stderr:\n{}", error);
}
if !error.is_empty() {
eprintln!("{}", error);
std::process::exit(code);
}
Ok(output)
}
fn do_task_with_timeout(
repo: &Repo,
command: &str,
metadata: Option<&CommitMetadata>,
local_path: &str,
) -> Result<String> {
let timeout_duration = Duration::from_secs(repo.timeout());
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg(command)
.current_dir(local_path)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(meta) = metadata {
for (key, value) in meta.to_env_vars() {
cmd.env(key, value);
}
}
let mut child = cmd
.spawn()
.map_err(|e| Error::other(format!("Failed to spawn command: {}", e)))?;
match child.wait_timeout(timeout_duration) {
Ok(Some(status)) => {
let mut stdout = String::new();
let mut stderr = String::new();
if let Some(mut out) = child.stdout.take() {
out.read_to_string(&mut stdout)
.map_err(|e| Error::other(format!("Failed to read stdout: {}", e)))?;
}
if let Some(mut err) = child.stderr.take() {
err.read_to_string(&mut stderr)
.map_err(|e| Error::other(format!("Failed to read stderr: {}", e)))?;
}
let code = status.code().unwrap_or(-1);
if repo.verbosity() > 1 {
info!("command status: {}", code);
info!("command stderr:\n{}", stderr);
}
if !stderr.is_empty() {
eprintln!("{}", stderr);
std::process::exit(code);
}
Ok(stdout)
}
Ok(None) => {
if repo.verbosity() > 0 {
warn!(
"Command timed out after {} seconds, killing process",
repo.timeout()
);
}
child
.kill()
.map_err(|e| Error::other(format!("Failed to kill timed-out process: {}", e)))?;
child.wait().ok();
Err(Error::other(format!(
"Command timed out after {} seconds",
repo.timeout()
)))
}
Err(e) => Err(Error::other(format!("Failed to wait on command: {}", e))),
}
}
#[cfg(test)]
mod repos_tests {
use super::*;
use std::io::ErrorKind;
#[test]
fn test_creation_of_repo() {
let repo = Repo::builder("file://.")
.local_path(".")
.branch("develop")
.build();
assert_eq!("develop", repo.branch());
}
#[test]
fn test_builder_defaults() {
let repo = Repo::builder("https://github.com/test/repo").build();
assert_eq!("main", repo.branch());
assert_eq!(120, repo.delay());
assert_eq!(1, repo.verbosity());
assert!(!repo.exec_on_start());
assert!(!repo.exit_on_first_diff());
assert_eq!(0, repo.timeout());
}
#[test]
fn test_do_task() {
let repo = Repo::builder("file://.")
.local_path(".")
.branch("develop")
.command("echo hello")
.verbosity(3)
.build();
let res = do_task(&repo, "echo hello", None);
assert_eq!(String::from("hello\n"), res.unwrap());
}
#[test]
fn test_do_process() -> Result<()> {
let temp_dir = std::env::temp_dir();
let mut local_path: String = temp_dir.into_os_string().into_string().unwrap();
let tmp_dir_name = format!("/{}/", uuid::Uuid::new_v4());
local_path.push_str(&tmp_dir_name);
let repo = Repo::builder("https://github.com/kitplummer/goa_tester")
.local_path(&local_path)
.branch("main")
.command("echo hello")
.verbosity(2)
.build();
repo.clone_repo()?;
assert_eq!(do_process(&repo)?, ());
Ok(())
}
#[test]
fn test_do_process_no_clone() -> Result<()> {
let temp_dir = std::env::temp_dir();
let mut local_path: String = temp_dir.into_os_string().into_string().unwrap();
let tmp_dir_name = format!("/{}/", uuid::Uuid::new_v4());
local_path.push_str(&tmp_dir_name);
let mut repo = Repo::builder("https://github.com/kitplummer/goa_tester")
.local_path(&local_path)
.branch("main")
.command("echo hello")
.verbosity(2)
.build();
repo.clone_repo()?;
repo.set_local_path(String::from("/blahdyblahblah"));
let res = do_process(&repo).unwrap_err();
assert_eq!(res.kind(), ErrorKind::Other);
Ok(())
}
#[test]
fn test_do_process_no_command() -> Result<()> {
let temp_dir = std::env::temp_dir();
let mut local_path: String = temp_dir.into_os_string().into_string().unwrap();
let tmp_dir_name = format!("/{}/", uuid::Uuid::new_v4());
local_path.push_str(&tmp_dir_name);
println!("local_path: {:?}", local_path);
let repo = Repo::builder("https://github.com/kitplummer/goa_tester")
.local_path(&local_path)
.branch("main")
.verbosity(3)
.build();
repo.clone_repo()?;
assert_eq!(do_process(&repo)?, ());
Ok(())
}
#[test]
fn test_no_goa_file() {
let res = read_goa_file("/blahdy/.goa");
assert_eq!(res, String::from("echo 'no goa file found yet'"));
}
#[test]
fn test_do_task_with_timeout() {
let repo = Repo::builder("file://.")
.local_path(".")
.branch("develop")
.command("echo hello")
.verbosity(3)
.timeout(5)
.build();
let res = do_task(&repo, "echo hello", None);
assert_eq!(String::from("hello\n"), res.unwrap());
}
#[test]
fn test_do_task_timeout_exceeded() {
let repo = Repo::builder("file://.")
.local_path(".")
.branch("develop")
.command("sleep 10")
.verbosity(1)
.timeout(1)
.build();
let res = do_task(&repo, "sleep 10", None);
assert!(res.is_err());
assert!(res.unwrap_err().to_string().contains("timed out"));
}
}