use anyhow::{anyhow, Result};
use bytes::Bytes;
use clap::{Parser};
use clap_verbosity_flag::{Verbosity, WarnLevel};
use tokio::process::{Command};
use tokio::time::{sleep, Duration};
use async_nats::{jetstream};
#[derive(Parser)]
struct Cli {
#[command(flatten)]
verbosity: Verbosity<WarnLevel>,
#[arg(long)]
nats: String,
#[arg(long)]
bucket: String,
#[arg(long)]
key: String,
#[arg(long)]
token: String,
#[arg(long)]
healthcheck: String,
#[arg(long)]
start: String,
#[arg(long)]
stop: String,
#[arg(short, default_value_t = 3000)]
r: u64,
#[arg(short, default_value_t = 3)]
f: u64,
#[arg(short, default_value_t = 2)]
c: u64,
}
struct Invocation {
args: Cli,
js: Option<jetstream::Context>,
latest_revision: u64,
latest_value: Bytes,
starts: u64,
exiting: bool,
active: bool,
}
impl Invocation {
async fn new() -> Result<Self> {
let mut r = Invocation {
args: Cli::parse(),
js: None,
latest_revision: 0,
latest_value: "".into(),
starts: 0,
exiting: false,
active: false,
};
r.setup().await?;
Ok(r)
}
async fn get_store(&mut self) -> Result<jetstream::kv::Store> {
let js = self.js.clone().ok_or(anyhow!("Jetstream not connected"))?;
let store_r = js.get_key_value(&self.args.bucket).await;
if let Ok(store) = store_r {
return Ok(store);
}
let bucket = self.args.bucket.clone();
log::warn!("bucket {bucket} does not exist. attempting to create it");
return Ok(
js.create_key_value(jetstream::kv::Config {
bucket: self.args.bucket.clone(),
..Default::default()
}).await?
);
}
async fn update_lock_data(&mut self) -> Result<()> {
let store = self.get_store().await?;
let e = store.entry(&self.args.key).await?;
match e {
Some(k) => {
self.latest_value = k.value.into();
self.latest_revision = k.revision;
}
None => {
self.latest_value = "".into();
self.latest_revision = 0;
}
};
Ok(())
}
async fn clear_lock(&mut self) -> Result<()> {
let store = self.get_store().await?;
store.update(&self.args.key, "".into(), self.latest_revision).await?;
self.update_lock_data().await?;
Ok(())
}
async fn take_lock(&mut self) -> Result<()> {
let store = self.get_store().await?;
store.update(&self.args.key, self.args.token.clone().into(), self.latest_revision).await?;
self.update_lock_data().await?;
Ok(())
}
async fn setup(&mut self) -> Result<()> {
let nc = async_nats::connect(&self.args.nats).await?;
println!("info: {:?}", nc.server_info());
self.js = Some(async_nats::jetstream::new(nc));
if self.args.verbosity.is_present() {
env_logger::Builder::new().filter_level(self.args.verbosity.log_level_filter()).init();
} else {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
}
log::error!("error level active");
log::warn!("warn level active");
log::info!("info level active");
log::debug!("debug level active");
log::trace!("trace level active");
Ok(())
}
async fn pump(&mut self) -> Result<()> {
self.update_lock_data().await?;
if self.latest_value == "" || self.latest_value == self.args.token { log::trace!("lk empty or mine? - yes - Found an empty lock or my own token present");
if self.healthcheck().await? {
log::trace!("health check succeeded");
let max_i = self.args.f;
for i in 1..=max_i {
if self.take_lock().await.is_ok() {
self.unfence_and_enable_process().await?;
log::trace!("successfully unfenced and enabled process. ending pump cycle.");
return Ok(());
} else {
log::trace!("failed to take lock. attempt {i}/{max_i}");
}
}
} else {
log::info!("sunny side health check failed");
}
log::trace!("attempting to clear lock and kill process");
match self.kill_process().await {
Ok(()) => {
log::info!("killed process");
}
e => {
log::info!("failed to kill process: {e:?}");
}
}
match self.clear_lock().await {
Ok(()) => {
log::info!("cleared lock");
}
e => {
log::info!("failed to clear lock (will not retry but will wait 1R extra): {e:?}");
self.wait_r(1).await;
}
}
log::trace!("waiting F*R");
self.wait_r(self.args.f).await;
return Ok(());
} else {
log::trace!("lk empty or mine? - no - Found a different token in the lock");
self.kill_process().await?;
log::trace!("kill_process asserted");
if !self.healthcheck().await? {
log::trace!("initial takeover health check failed. not eligible for takeover. ending pump cycle.");
return Ok(());
}
log::trace!("initial takeover health check succeeded. waiting F for takeover confirmation.");
self.wait_r(self.args.f).await;
if !self.healthcheck().await? {
log::trace!("confirmation takeover health check failed. no longer eligible for takeover. ending pump cycle.");
return Ok(());
}
log::trace!("confirmation takeover health check succeeded.");
if self.take_lock().await.is_ok() {
log::info!("lock taken. waiting C for other services to die.");
for _ in 1..=self.args.c {
self.wait_r(1).await;
self.take_lock().await?;
log::trace!("lock asserted while waiting for C");
}
} else {
log::trace!("failed to take lock. ending cycle");
return Ok(());
}
}
log::trace!("ending pump cycle normally.");
Ok(())
}
async fn run(&mut self) -> Result<()> {
let mut round = 0;
self.starts = self.starts + 1;
let this_start = self.starts;
self.wait_r(self.args.c).await;
loop {
round = round + 1;
if self.exiting {
log::info!("exiting before round {round}");
break;
}
let round_timer = sleep(Duration::from_millis(self.args.r));
match self.pump().await {
Ok(()) => {
log::info!("pump {this_start}.{round}: success");
}
e => {
self.wait_r(self.args.c).await;
log::error!("pump {this_start}.{round}: error: {e:?}");
}
}
round_timer.await;
}
Ok(())
}
async fn kill_process(&mut self) -> Result<()> {
log::trace!("attempting to kill process");
let was_active = self.active;
self.active = false;
if was_active {
log::warn!("killing process");
}
let started = Command::new("/bin/sh")
.arg("-c")
.arg(self.args.stop.clone())
.arg("kill")
.spawn()?.wait().await?.success();
if started {
log::trace!("process killed successfully");
if !self.active {
log::warn!("process killed successfully");
}
} else {
log::warn!("kill process failed");
}
Ok(())
}
async fn unfence_and_enable_process(&mut self) -> Result<()> {
log::trace!("attempting to unfence and enable process");
let was_active = self.active;
self.active = true;
if !was_active {
log::warn!("starting process");
}
let started = Command::new("/bin/sh")
.arg("-c")
.arg(self.args.start.clone())
.arg("start")
.arg(if self.active {"active"} else {"standby"})
.spawn()?.wait().await?.success();
if started {
log::trace!("process started successfully");
if !self.active {
log::warn!("process started successfully");
}
} else {
log::warn!("unfence/enable process failed");
}
Ok(())
}
async fn healthcheck(&mut self) -> Result<bool> {
let mut child = Command::new("/bin/sh")
.arg("-c")
.arg(self.args.healthcheck.clone())
.arg("healthcheck")
.arg(if self.active {"active"} else {"standby"})
.spawn()?;
tokio::select! {
status = child.wait() => {
Ok(status?.success())
}
_ = sleep(Duration::from_millis(self.args.r * self.args.f)) => {
log::warn!("health check timed out.");
Ok(false)
}
}
}
async fn wait_r(&mut self, count: u64) {
sleep(Duration::from_millis(self.args.r * count)).await;
}
}
#[tokio::main]
async fn main() -> Result<()> {
log::info!("here we come");
let mut inv = Invocation::new().await?;
log::info!("here we are");
let rr = inv.run().await;
log::info!("{rr:?}");
Ok(())
}