use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::confirm::Untrusted;
use crate::error::CoreError;
use crate::scope::Origin;
use crate::sensitivity::Sensitivity;
use crate::store;
pub const INTAKE_DIR: &str = "intake";
const INTAKE_EXT: &str = "json";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PendingIntake {
pub id: String,
pub created_unix: u64,
pub coordinate: String,
pub sensitivity: Sensitivity,
pub environment: String,
pub origin: Origin,
pub requesting_process: Option<String>,
pub description: Option<Untrusted>,
}
impl PendingIntake {
pub fn new(
coordinate: impl Into<String>,
sensitivity: Sensitivity,
environment: impl Into<String>,
origin: Origin,
) -> Self {
Self {
id: String::new(),
created_unix: 0,
coordinate: coordinate.into(),
sensitivity,
environment: environment.into(),
origin,
requesting_process: None,
description: None,
}
}
pub fn with_requesting_process(mut self, s: impl Into<String>) -> Self {
self.requesting_process = Some(s.into());
self
}
pub fn with_description(mut self, text: impl Into<String>) -> Self {
self.description = Some(Untrusted(text.into()));
self
}
}
pub struct IntakeBroker {
dir: PathBuf,
counter: AtomicU64,
}
impl IntakeBroker {
pub fn new(dir: impl Into<PathBuf>) -> Self {
Self {
dir: dir.into(),
counter: AtomicU64::new(0),
}
}
pub fn under_root(root: &Path) -> Self {
Self::new(root.join(INTAKE_DIR))
}
fn intake_path(&self, id: &str) -> PathBuf {
self.dir.join(format!("{id}.{INTAKE_EXT}"))
}
fn next_id(&self) -> String {
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let n = self.counter.fetch_add(1, Ordering::SeqCst);
format!("{secs}-{}-{n}", std::process::id())
}
pub fn create(&self, mut intake: PendingIntake) -> Result<PendingIntake, CoreError> {
intake.id = self.next_id();
intake.created_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
store::ensure_dir(&self.dir)?;
let bytes = serde_json::to_vec(&intake)
.map_err(|e| CoreError::Io(format!("serialize intake: {e}")))?;
atomic_write(&self.intake_path(&intake.id), &bytes)?;
Ok(intake)
}
pub fn list_pending(&self) -> Result<Vec<PendingIntake>, CoreError> {
let mut out = Vec::new();
if !self.dir.exists() {
return Ok(out);
}
let entries =
fs::read_dir(&self.dir).map_err(|e| CoreError::Io(format!("read intake dir: {e}")))?;
for entry in entries {
let path = entry
.map_err(|e| CoreError::Io(format!("intake entry: {e}")))?
.path();
if path.extension().and_then(|e| e.to_str()) != Some(INTAKE_EXT) {
continue;
}
let bytes = match fs::read(&path) {
Ok(b) => b,
Err(_) => continue, };
if let Ok(intake) = serde_json::from_slice::<PendingIntake>(&bytes) {
out.push(intake);
}
}
out.sort_by(|a, b| a.id.cmp(&b.id));
Ok(out)
}
pub fn get(&self, id: &str) -> Result<Option<PendingIntake>, CoreError> {
match fs::read(self.intake_path(id)) {
Ok(bytes) => serde_json::from_slice::<PendingIntake>(&bytes)
.map(Some)
.map_err(|e| CoreError::Io(format!("parse intake: {e}"))),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(CoreError::Io(format!("read intake: {e}"))),
}
}
pub fn cancel(&self, id: &str) -> Result<bool, CoreError> {
match fs::remove_file(self.intake_path(id)) {
Ok(()) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(e) => Err(CoreError::Io(format!("remove intake: {e}"))),
}
}
}
fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), CoreError> {
let tmp = path.with_extension("tmp");
fs::write(&tmp, bytes).map_err(|e| CoreError::Io(format!("write intake: {e}")))?;
store::restrict(&tmp, 0o600)?;
fs::rename(&tmp, path).map_err(|e| CoreError::Io(format!("rename intake: {e}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn broker(dir: &Path) -> IntakeBroker {
IntakeBroker::new(dir.join(INTAKE_DIR))
}
fn sample() -> PendingIntake {
PendingIntake::new("prod/db/password", Sensitivity::High, "prod", Origin::Agent)
.with_requesting_process("node (pid 1234)")
.with_description("rotating the prod DB password")
}
#[test]
fn persisted_intake_has_no_value_field() {
let tmp = tempfile::tempdir().unwrap();
let b = broker(tmp.path());
let stored = b.create(sample()).unwrap();
let raw = fs::read_to_string(b.intake_path(&stored.id)).unwrap();
let json: serde_json::Value = serde_json::from_str(&raw).unwrap();
let obj = json.as_object().unwrap();
assert!(
!obj.contains_key("value"),
"intake must never persist a value (I11/I12/I14): {raw}"
);
assert_eq!(obj["coordinate"], "prod/db/password");
assert_eq!(obj["environment"], "prod");
}
#[cfg(unix)]
#[test]
fn intake_file_is_owner_only() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let b = broker(tmp.path());
let stored = b.create(sample()).unwrap();
let mode = fs::metadata(b.intake_path(&stored.id))
.unwrap()
.permissions()
.mode()
& 0o777;
assert_eq!(mode, 0o600, "intake file must be 0600");
}
#[test]
fn create_is_non_blocking_and_lists_authoritative_metadata() {
let tmp = tempfile::tempdir().unwrap();
let b = broker(tmp.path());
let stored = b.create(sample()).unwrap();
assert!(!stored.id.is_empty());
let pending = b.list_pending().unwrap();
assert_eq!(pending.len(), 1);
let p = &pending[0];
assert_eq!(p.coordinate, "prod/db/password");
assert_eq!(p.sensitivity, Sensitivity::High);
assert_eq!(p.environment, "prod");
assert_eq!(p.requesting_process.as_deref(), Some("node (pid 1234)"));
let desc = p.description.as_ref().unwrap();
assert!(format!("{desc}").contains("untrusted"));
}
#[test]
fn get_then_cancel_roundtrips() {
let tmp = tempfile::tempdir().unwrap();
let b = broker(tmp.path());
let stored = b.create(sample()).unwrap();
let got = b.get(&stored.id).unwrap().unwrap();
assert_eq!(got, stored);
assert!(b.cancel(&stored.id).unwrap());
assert!(b.get(&stored.id).unwrap().is_none());
assert!(b.list_pending().unwrap().is_empty());
assert!(!b.cancel(&stored.id).unwrap());
assert!(!b.cancel("no-such-id").unwrap());
}
#[test]
fn broker_never_couples_creation_to_value_capture() {
let tmp = tempfile::tempdir().unwrap();
let b = broker(tmp.path());
let stored = b.create(sample()).unwrap();
let serialized = serde_json::to_string(&stored).unwrap();
assert!(!serialized.contains("value"));
assert_eq!(b.list_pending().unwrap().len(), 1);
}
#[test]
fn list_pending_is_empty_when_no_dir() {
let tmp = tempfile::tempdir().unwrap();
let b = broker(tmp.path());
assert!(b.list_pending().unwrap().is_empty());
}
}