use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::confirm::{ConfirmOutcome, ConfirmRequest, Confirmer};
use crate::error::CoreError;
use crate::store;
pub const PENDING_DIR: &str = "pending";
const REQUEST_EXT: &str = "json";
const DECISION_EXT: &str = "decision";
const APPROVED: &str = "approved";
const DENIED: &str = "denied";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PendingRequest {
pub id: String,
pub created_unix: u64,
pub request: ConfirmRequest,
}
pub struct FileConfirmer {
dir: PathBuf,
poll: Duration,
counter: AtomicU64,
}
impl FileConfirmer {
pub fn new(dir: impl Into<PathBuf>) -> Self {
Self {
dir: dir.into(),
poll: Duration::from_millis(250),
counter: AtomicU64::new(0),
}
}
pub fn under_root(root: &Path) -> Self {
Self::new(root.join(PENDING_DIR))
}
pub fn with_poll(mut self, poll: Duration) -> Self {
self.poll = poll;
self
}
fn request_path(&self, id: &str) -> PathBuf {
self.dir.join(format!("{id}.{REQUEST_EXT}"))
}
fn decision_path(&self, id: &str) -> PathBuf {
self.dir.join(format!("{id}.{DECISION_EXT}"))
}
fn next_id(&self) -> String {
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let n = self.counter.fetch_add(1, Ordering::SeqCst);
format!("{secs}-{}-{n}", std::process::id())
}
pub fn list_pending(&self) -> Result<Vec<PendingRequest>, CoreError> {
let mut out = Vec::new();
if !self.dir.exists() {
return Ok(out);
}
let entries =
fs::read_dir(&self.dir).map_err(|e| CoreError::Io(format!("read pending dir: {e}")))?;
for entry in entries {
let path = entry
.map_err(|e| CoreError::Io(format!("pending entry: {e}")))?
.path();
if path.extension().and_then(|e| e.to_str()) != Some(REQUEST_EXT) {
continue;
}
let bytes = match fs::read(&path) {
Ok(b) => b,
Err(_) => continue, };
if let Ok(pending) = serde_json::from_slice::<PendingRequest>(&bytes) {
out.push(pending);
}
}
out.sort_by(|a, b| a.id.cmp(&b.id));
Ok(out)
}
pub fn approve(&self, id: &str) -> Result<bool, CoreError> {
self.decide(id, true)
}
pub fn deny(&self, id: &str) -> Result<bool, CoreError> {
self.decide(id, false)
}
fn decide(&self, id: &str, approved: bool) -> Result<bool, CoreError> {
if !self.request_path(id).exists() {
return Ok(false);
}
let body = if approved { APPROVED } else { DENIED };
atomic_write(&self.decision_path(id), body.as_bytes())?;
Ok(true)
}
fn cleanup(&self, id: &str) {
let _ = fs::remove_file(self.request_path(id));
let _ = fs::remove_file(self.decision_path(id));
}
}
impl Confirmer for FileConfirmer {
fn confirm(&self, req: &ConfirmRequest, timeout: Duration) -> ConfirmOutcome {
let id = self.next_id();
let created_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let pending = PendingRequest {
id: id.clone(),
created_unix,
request: req.clone(),
};
if store::ensure_dir(&self.dir).is_err() {
return ConfirmOutcome::Denied;
}
let Ok(bytes) = serde_json::to_vec(&pending) else {
return ConfirmOutcome::Denied;
};
if atomic_write(&self.request_path(&id), &bytes).is_err() {
return ConfirmOutcome::Denied;
}
let deadline = Instant::now() + timeout;
loop {
if let Ok(body) = fs::read_to_string(self.decision_path(&id)) {
let outcome = if body.trim() == APPROVED {
ConfirmOutcome::Approved
} else {
ConfirmOutcome::Denied
};
self.cleanup(&id);
return outcome;
}
if Instant::now() >= deadline {
self.cleanup(&id);
return ConfirmOutcome::TimedOut; }
thread::sleep(
self.poll
.min(deadline.saturating_duration_since(Instant::now())),
);
}
}
}
fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), CoreError> {
let tmp = path.with_extension("tmp");
fs::write(&tmp, bytes).map_err(|e| CoreError::Io(format!("write pending: {e}")))?;
store::restrict(&tmp, 0o600)?;
fs::rename(&tmp, path).map_err(|e| CoreError::Io(format!("rename pending: {e}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scope::Origin;
use crate::sensitivity::Sensitivity;
fn req() -> ConfirmRequest {
ConfirmRequest::new("prod/db/password", Sensitivity::High, "prod", Origin::Human)
.with_command("/usr/bin/deploy --env prod")
}
fn broker(dir: &Path) -> FileConfirmer {
FileConfirmer::new(dir.join(PENDING_DIR)).with_poll(Duration::from_millis(10))
}
#[test]
fn approve_from_another_broker_yields_approved() {
let tmp = tempfile::tempdir().unwrap();
let runner = broker(tmp.path());
let approver = broker(tmp.path());
let handle = std::thread::spawn(move || {
loop {
let pending = approver.list_pending().unwrap();
if let Some(p) = pending.first() {
assert!(approver.approve(&p.id).unwrap());
break;
}
std::thread::yield_now();
}
});
let outcome = runner.confirm(&req(), Duration::from_secs(5));
handle.join().unwrap();
assert_eq!(outcome, ConfirmOutcome::Approved);
assert!(runner.list_pending().unwrap().is_empty());
}
#[test]
fn deny_from_another_broker_yields_denied() {
let tmp = tempfile::tempdir().unwrap();
let runner = broker(tmp.path());
let approver = broker(tmp.path());
let handle = std::thread::spawn(move || {
loop {
if let Some(p) = approver.list_pending().unwrap().first() {
assert!(approver.deny(&p.id).unwrap());
break;
}
std::thread::yield_now();
}
});
let outcome = runner.confirm(&req(), Duration::from_secs(5));
handle.join().unwrap();
assert_eq!(outcome, ConfirmOutcome::Denied);
}
#[test]
fn timeout_fails_safe_and_cleans_up() {
let tmp = tempfile::tempdir().unwrap();
let runner = broker(tmp.path());
let outcome = runner.confirm(&req(), Duration::from_millis(30));
assert_eq!(outcome, ConfirmOutcome::TimedOut);
assert!(!outcome.is_approved());
assert!(runner.list_pending().unwrap().is_empty());
}
#[test]
fn list_pending_surfaces_the_authoritative_request() {
let tmp = tempfile::tempdir().unwrap();
let runner = broker(tmp.path());
let lister = broker(tmp.path());
let handle = std::thread::spawn(move || {
loop {
let pending = lister.list_pending().unwrap();
if let Some(p) = pending.first() {
assert_eq!(p.request.coordinate, "prod/db/password");
assert_eq!(
p.request.resolved_command.as_deref(),
Some("/usr/bin/deploy --env prod")
);
assert_eq!(p.request.sensitivity, Sensitivity::High);
lister.approve(&p.id).unwrap();
break;
}
std::thread::yield_now();
}
});
let outcome = runner.confirm(&req(), Duration::from_secs(5));
handle.join().unwrap();
assert_eq!(outcome, ConfirmOutcome::Approved);
}
#[test]
fn approve_unknown_id_is_false() {
let tmp = tempfile::tempdir().unwrap();
let runner = broker(tmp.path());
assert!(!runner.approve("no-such-id").unwrap());
}
}