kovra-core 0.9.0

Core of kovra — local secrets manager for development: vault, sensitivity policy, providers, and the security invariants.
Documentation
//! Intake broker (KOV-69a) — the **inversion** of the approval broker
//! ([`crate::file_confirm`]).
//!
//! The approval broker answers *"an agent asked to USE a secret — may it?"*: the
//! requester blocks until a human approves in another process. The intake broker
//! answers the mirror question *"an agent asked to CREATE a secret — supply the
//! value"*: the agent's request is **non-blocking** and the value arrives later,
//! out-of-band, from a human-summoned fulfillment surface (CLI `kovra intake`,
//! the menu-bar app in KOV-69b).
//!
//! The point is the security boundary: the agent (via the MCP `request_secret`
//! tool) only ever names the **coordinate/sensitivity/environment** — never a
//! value. The value goes human → vault directly and never enters model context
//! (I11/I14). A [`PendingIntake`] therefore carries only authoritative
//! address/metadata, exactly like [`crate::file_confirm::PendingRequest`] — no
//! value field exists (I12). Files are `0600` under a `0700` dir, written
//! atomically (temp → rename).
//!
//! Sealing is **not** done here: this broker owns only the pending-intake
//! lifecycle (create / list / get / cancel) and has no vault coupling. The
//! fulfillment surface re-reads the pending intake, runs the attended
//! [`crate::confirm::Confirmer`] gate (I16), seals the human-supplied value
//! through the existing `set` path (born per I5), and then [`cancel`]s the
//! intake. Keeping the broker pure keeps its invariants legible and unit-testable
//! without a vault.
//!
//! Anti-phishing (KOV-69, structural): there is **no** method here that both
//! creates an intake and opens/reveals a value-capture surface. [`create`] only
//! writes a file; the capture surface is opened solely by a human action in the
//! fulfillment layer. The per-session identity nonce that the menu-bar surface
//! cross-checks (KOV-69b) is a property of that *surface*, not of an
//! agent-created record, so it deliberately does not live on [`PendingIntake`].
//!
//! [`create`]: IntakeBroker::create
//! [`cancel`]: IntakeBroker::cancel

use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};

use crate::confirm::Untrusted;
use crate::error::CoreError;
use crate::scope::Origin;
use crate::sensitivity::Sensitivity;
use crate::store;

/// The conventional pending-intake subdirectory under the registry root.
pub const INTAKE_DIR: &str = "intake";

const INTAKE_EXT: &str = "json";

/// A pending agent-initiated secret-creation request, persisted for the
/// human-summoned fulfillment surface (CLI `kovra intake list`, the menu-bar app).
///
/// Carries only the authoritative address/metadata the agent named — **never a
/// value** (I11/I12/I14). There is no `value` field by construction; the value is
/// supplied out-of-band at fulfillment and goes straight to the vault.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PendingIntake {
    /// The operator-typable intake id (`<unix>-<pid>-<n>`, same scheme as the
    /// approval broker).
    pub id: String,
    /// Seconds since the Unix epoch when the intake was created.
    pub created_unix: u64,
    /// The coordinate the value will be sealed under (an address, not a value).
    pub coordinate: String,
    /// Sensitivity the secret will be born with (a `prod` secret is born `high`,
    /// I5 — enforced at the seal, recorded here for the fulfillment prompt).
    pub sensitivity: Sensitivity,
    /// Environment (`prod` is highlighted by the renderer).
    pub environment: String,
    /// Who initiated the intake — weighs into the human's decision.
    pub origin: Origin,
    /// The requesting process / caller identity, a **trusted, observed fact** set
    /// by the FFI/CLI boundary (never from requester text). Carries no value.
    pub requesting_process: Option<String>,
    /// Optional requester free-text, segregated as untrusted (I16) — the agent's
    /// "why", never the authoritative line.
    pub description: Option<Untrusted>,
}

impl PendingIntake {
    /// Build an intake request from the authoritative fields. `id`/`created_unix`
    /// are placeholders until [`IntakeBroker::create`] stamps them.
    pub fn new(
        coordinate: impl Into<String>,
        sensitivity: Sensitivity,
        environment: impl Into<String>,
        origin: Origin,
    ) -> Self {
        Self {
            id: String::new(),
            created_unix: 0,
            coordinate: coordinate.into(),
            sensitivity,
            environment: environment.into(),
            origin,
            requesting_process: None,
            description: None,
        }
    }

    /// Attach the **trusted, observed** requesting-process identity (mirrors
    /// [`crate::confirm::ConfirmRequest::with_requesting_process`]).
    pub fn with_requesting_process(mut self, s: impl Into<String>) -> Self {
        self.requesting_process = Some(s.into());
        self
    }

    /// Attach segregated, untrusted requester free-text.
    pub fn with_description(mut self, text: impl Into<String>) -> Self {
        self.description = Some(Untrusted(text.into()));
        self
    }
}

/// A file-backed queue of pending intakes under `<root>/intake/`.
pub struct IntakeBroker {
    dir: PathBuf,
    counter: AtomicU64,
}

impl IntakeBroker {
    /// A broker writing under `dir` (the intake directory itself).
    pub fn new(dir: impl Into<PathBuf>) -> Self {
        Self {
            dir: dir.into(),
            counter: AtomicU64::new(0),
        }
    }

    /// A broker rooted at a registry root: `<root>/intake/`.
    pub fn under_root(root: &Path) -> Self {
        Self::new(root.join(INTAKE_DIR))
    }

    fn intake_path(&self, id: &str) -> PathBuf {
        self.dir.join(format!("{id}.{INTAKE_EXT}"))
    }

    fn next_id(&self) -> String {
        let secs = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);
        let n = self.counter.fetch_add(1, Ordering::SeqCst);
        format!("{secs}-{}-{n}", std::process::id())
    }

    /// Record a new pending intake and return it with its stamped id. This is the
    /// agent-facing entry point: it is **non-blocking** (unlike the approval
    /// broker's `confirm`) and persists only address/metadata — no value.
    pub fn create(&self, mut intake: PendingIntake) -> Result<PendingIntake, CoreError> {
        intake.id = self.next_id();
        intake.created_unix = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);
        store::ensure_dir(&self.dir)?;
        let bytes = serde_json::to_vec(&intake)
            .map_err(|e| CoreError::Io(format!("serialize intake: {e}")))?;
        atomic_write(&self.intake_path(&intake.id), &bytes)?;
        Ok(intake)
    }

    /// Enumerate the open pending intakes, sorted by id (feeds `kovra intake list`
    /// and the menu-bar badge).
    pub fn list_pending(&self) -> Result<Vec<PendingIntake>, CoreError> {
        let mut out = Vec::new();
        if !self.dir.exists() {
            return Ok(out);
        }
        let entries =
            fs::read_dir(&self.dir).map_err(|e| CoreError::Io(format!("read intake dir: {e}")))?;
        for entry in entries {
            let path = entry
                .map_err(|e| CoreError::Io(format!("intake entry: {e}")))?
                .path();
            if path.extension().and_then(|e| e.to_str()) != Some(INTAKE_EXT) {
                continue;
            }
            let bytes = match fs::read(&path) {
                Ok(b) => b,
                Err(_) => continue, // racing cleanup — skip
            };
            if let Ok(intake) = serde_json::from_slice::<PendingIntake>(&bytes) {
                out.push(intake);
            }
        }
        out.sort_by(|a, b| a.id.cmp(&b.id));
        Ok(out)
    }

    /// Read a single pending intake by id (the fulfillment surface re-reads the
    /// authoritative record before prompting). `None` if no such intake is open.
    pub fn get(&self, id: &str) -> Result<Option<PendingIntake>, CoreError> {
        match fs::read(self.intake_path(id)) {
            Ok(bytes) => serde_json::from_slice::<PendingIntake>(&bytes)
                .map(Some)
                .map_err(|e| CoreError::Io(format!("parse intake: {e}"))),
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
            Err(e) => Err(CoreError::Io(format!("read intake: {e}"))),
        }
    }

    /// Remove a pending intake (after it is fulfilled, or to cancel it). Returns
    /// `false` if no such intake was open.
    pub fn cancel(&self, id: &str) -> Result<bool, CoreError> {
        match fs::remove_file(self.intake_path(id)) {
            Ok(()) => Ok(true),
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
            Err(e) => Err(CoreError::Io(format!("remove intake: {e}"))),
        }
    }
}

/// Write `bytes` to `path` atomically (temp → rename), `0600` on Unix — mirrors
/// the approval broker's `atomic_write`, reusing `store::restrict` (the single
/// owner of kovra's on-disk permission policy).
fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), CoreError> {
    let tmp = path.with_extension("tmp");
    fs::write(&tmp, bytes).map_err(|e| CoreError::Io(format!("write intake: {e}")))?;
    store::restrict(&tmp, 0o600)?;
    fs::rename(&tmp, path).map_err(|e| CoreError::Io(format!("rename intake: {e}")))?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    fn broker(dir: &Path) -> IntakeBroker {
        IntakeBroker::new(dir.join(INTAKE_DIR))
    }

    fn sample() -> PendingIntake {
        PendingIntake::new("prod/db/password", Sensitivity::High, "prod", Origin::Agent)
            .with_requesting_process("node (pid 1234)")
            .with_description("rotating the prod DB password")
    }

    // I11/I14 — the persisted intake carries the address/metadata but NEVER a
    // value: there is no `value` field in the model, and none appears on disk.
    #[test]
    fn persisted_intake_has_no_value_field() {
        let tmp = tempfile::tempdir().unwrap();
        let b = broker(tmp.path());
        let stored = b.create(sample()).unwrap();

        let raw = fs::read_to_string(b.intake_path(&stored.id)).unwrap();
        let json: serde_json::Value = serde_json::from_str(&raw).unwrap();
        let obj = json.as_object().unwrap();
        assert!(
            !obj.contains_key("value"),
            "intake must never persist a value (I11/I12/I14): {raw}"
        );
        // The authoritative address/metadata IS present.
        assert_eq!(obj["coordinate"], "prod/db/password");
        assert_eq!(obj["environment"], "prod");
    }

    // I12 — the intake file is owner-only (0600) on Unix, like the approval broker.
    #[cfg(unix)]
    #[test]
    fn intake_file_is_owner_only() {
        use std::os::unix::fs::PermissionsExt;
        let tmp = tempfile::tempdir().unwrap();
        let b = broker(tmp.path());
        let stored = b.create(sample()).unwrap();
        let mode = fs::metadata(b.intake_path(&stored.id))
            .unwrap()
            .permissions()
            .mode()
            & 0o777;
        assert_eq!(mode, 0o600, "intake file must be 0600");
    }

    // create is non-blocking and list_pending surfaces the authoritative request;
    // the agent-supplied description is fenced as Untrusted, never authoritative.
    #[test]
    fn create_is_non_blocking_and_lists_authoritative_metadata() {
        let tmp = tempfile::tempdir().unwrap();
        let b = broker(tmp.path());
        // create returns immediately (no Confirmer, no block) with a stamped id.
        let stored = b.create(sample()).unwrap();
        assert!(!stored.id.is_empty());

        let pending = b.list_pending().unwrap();
        assert_eq!(pending.len(), 1);
        let p = &pending[0];
        assert_eq!(p.coordinate, "prod/db/password");
        assert_eq!(p.sensitivity, Sensitivity::High);
        assert_eq!(p.environment, "prod");
        assert_eq!(p.requesting_process.as_deref(), Some("node (pid 1234)"));
        // The description is segregated as untrusted.
        let desc = p.description.as_ref().unwrap();
        assert!(format!("{desc}").contains("untrusted"));
    }

    // get re-reads the authoritative record; cancel removes it (the fulfillment
    // surface cancels after sealing). cancel of an unknown id is false.
    #[test]
    fn get_then_cancel_roundtrips() {
        let tmp = tempfile::tempdir().unwrap();
        let b = broker(tmp.path());
        let stored = b.create(sample()).unwrap();

        let got = b.get(&stored.id).unwrap().unwrap();
        assert_eq!(got, stored);

        assert!(b.cancel(&stored.id).unwrap());
        assert!(b.get(&stored.id).unwrap().is_none());
        assert!(b.list_pending().unwrap().is_empty());
        // cancelling again (or an unknown id) is a no-op false, not an error.
        assert!(!b.cancel(&stored.id).unwrap());
        assert!(!b.cancel("no-such-id").unwrap());
    }

    // Anti-phishing / human-summoned-only (structural): the broker has no API that
    // both creates an intake AND yields a value or opens a capture surface. The
    // only product of create() is a PendingIntake of pure metadata; the value can
    // only arrive out-of-band at the fulfillment layer. This test pins that the
    // created record is value-free and that completing an intake is a separate,
    // human-driven step (cancel after an out-of-band seal), not a broker method.
    #[test]
    fn broker_never_couples_creation_to_value_capture() {
        let tmp = tempfile::tempdir().unwrap();
        let b = broker(tmp.path());
        let stored = b.create(sample()).unwrap();
        // The returned record is pure metadata — there is no value to leak.
        let serialized = serde_json::to_string(&stored).unwrap();
        assert!(!serialized.contains("value"));
        // Nothing was sealed by create(): the intake is still pending until a
        // human fulfills it out-of-band.
        assert_eq!(b.list_pending().unwrap().len(), 1);
    }

    // list_pending tolerates a non-existent dir and non-json clutter.
    #[test]
    fn list_pending_is_empty_when_no_dir() {
        let tmp = tempfile::tempdir().unwrap();
        let b = broker(tmp.path());
        assert!(b.list_pending().unwrap().is_empty());
    }
}