Skip to main content

kovra_core/
intake.rs

1//! Intake broker (KOV-69a) — the **inversion** of the approval broker
2//! ([`crate::file_confirm`]).
3//!
4//! The approval broker answers *"an agent asked to USE a secret — may it?"*: the
5//! requester blocks until a human approves in another process. The intake broker
6//! answers the mirror question *"an agent asked to CREATE a secret — supply the
7//! value"*: the agent's request is **non-blocking** and the value arrives later,
8//! out-of-band, from a human-summoned fulfillment surface (CLI `kovra intake`,
9//! the menu-bar app in KOV-69b).
10//!
11//! The point is the security boundary: the agent (via the MCP `request_secret`
12//! tool) only ever names the **coordinate/sensitivity/environment** — never a
13//! value. The value goes human → vault directly and never enters model context
14//! (I11/I14). A [`PendingIntake`] therefore carries only authoritative
15//! address/metadata, exactly like [`crate::file_confirm::PendingRequest`] — no
16//! value field exists (I12). Files are `0600` under a `0700` dir, written
17//! atomically (temp → rename).
18//!
19//! Sealing is **not** done here: this broker owns only the pending-intake
20//! lifecycle (create / list / get / cancel) and has no vault coupling. The
21//! fulfillment surface re-reads the pending intake, runs the attended
22//! [`crate::confirm::Confirmer`] gate (I16), seals the human-supplied value
23//! through the existing `set` path (born per I5), and then [`cancel`]s the
24//! intake. Keeping the broker pure keeps its invariants legible and unit-testable
25//! without a vault.
26//!
27//! Anti-phishing (KOV-69, structural): there is **no** method here that both
28//! creates an intake and opens/reveals a value-capture surface. [`create`] only
29//! writes a file; the capture surface is opened solely by a human action in the
30//! fulfillment layer. The per-session identity nonce that the menu-bar surface
31//! cross-checks (KOV-69b) is a property of that *surface*, not of an
32//! agent-created record, so it deliberately does not live on [`PendingIntake`].
33//!
34//! [`create`]: IntakeBroker::create
35//! [`cancel`]: IntakeBroker::cancel
36
37use std::fs;
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::time::{SystemTime, UNIX_EPOCH};
41
42use serde::{Deserialize, Serialize};
43
44use crate::confirm::Untrusted;
45use crate::error::CoreError;
46use crate::scope::Origin;
47use crate::sensitivity::Sensitivity;
48use crate::store;
49
50/// The conventional pending-intake subdirectory under the registry root.
51pub const INTAKE_DIR: &str = "intake";
52
53const INTAKE_EXT: &str = "json";
54
55/// A pending agent-initiated secret-creation request, persisted for the
56/// human-summoned fulfillment surface (CLI `kovra intake list`, the menu-bar app).
57///
58/// Carries only the authoritative address/metadata the agent named — **never a
59/// value** (I11/I12/I14). There is no `value` field by construction; the value is
60/// supplied out-of-band at fulfillment and goes straight to the vault.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct PendingIntake {
63    /// The operator-typable intake id (`<unix>-<pid>-<n>`, same scheme as the
64    /// approval broker).
65    pub id: String,
66    /// Seconds since the Unix epoch when the intake was created.
67    pub created_unix: u64,
68    /// The coordinate the value will be sealed under (an address, not a value).
69    pub coordinate: String,
70    /// Sensitivity the secret will be born with (a `prod` secret is born `high`,
71    /// I5 — enforced at the seal, recorded here for the fulfillment prompt).
72    pub sensitivity: Sensitivity,
73    /// Environment (`prod` is highlighted by the renderer).
74    pub environment: String,
75    /// Who initiated the intake — weighs into the human's decision.
76    pub origin: Origin,
77    /// The requesting process / caller identity, a **trusted, observed fact** set
78    /// by the FFI/CLI boundary (never from requester text). Carries no value.
79    pub requesting_process: Option<String>,
80    /// Optional requester free-text, segregated as untrusted (I16) — the agent's
81    /// "why", never the authoritative line.
82    pub description: Option<Untrusted>,
83    /// Optional destination **project vault** the value should be sealed into
84    /// (`None` ⇒ the global vault). An address hint, never a value — it just
85    /// routes which vault the fulfillment seals into and preselects the project
86    /// in the native fulfill window. `#[serde(default)]` keeps intakes written
87    /// before this field deserializable.
88    #[serde(default)]
89    pub project: Option<String>,
90}
91
92impl PendingIntake {
93    /// Build an intake request from the authoritative fields. `id`/`created_unix`
94    /// are placeholders until [`IntakeBroker::create`] stamps them.
95    pub fn new(
96        coordinate: impl Into<String>,
97        sensitivity: Sensitivity,
98        environment: impl Into<String>,
99        origin: Origin,
100    ) -> Self {
101        Self {
102            id: String::new(),
103            created_unix: 0,
104            coordinate: coordinate.into(),
105            sensitivity,
106            environment: environment.into(),
107            origin,
108            requesting_process: None,
109            description: None,
110            project: None,
111        }
112    }
113
114    /// Attach the **trusted, observed** requesting-process identity (mirrors
115    /// [`crate::confirm::ConfirmRequest::with_requesting_process`]).
116    pub fn with_requesting_process(mut self, s: impl Into<String>) -> Self {
117        self.requesting_process = Some(s.into());
118        self
119    }
120
121    /// Attach segregated, untrusted requester free-text.
122    pub fn with_description(mut self, text: impl Into<String>) -> Self {
123        self.description = Some(Untrusted(text.into()));
124        self
125    }
126
127    /// Attach the destination project vault (`None` ⇒ global). Routes which vault
128    /// the fulfillment seals into and preselects it in the fulfill window.
129    pub fn with_project(mut self, project: Option<String>) -> Self {
130        self.project = project.filter(|p| !p.is_empty());
131        self
132    }
133}
134
135/// A file-backed queue of pending intakes under `<root>/intake/`.
136pub struct IntakeBroker {
137    dir: PathBuf,
138    counter: AtomicU64,
139}
140
141impl IntakeBroker {
142    /// A broker writing under `dir` (the intake directory itself).
143    pub fn new(dir: impl Into<PathBuf>) -> Self {
144        Self {
145            dir: dir.into(),
146            counter: AtomicU64::new(0),
147        }
148    }
149
150    /// A broker rooted at a registry root: `<root>/intake/`.
151    pub fn under_root(root: &Path) -> Self {
152        Self::new(root.join(INTAKE_DIR))
153    }
154
155    fn intake_path(&self, id: &str) -> PathBuf {
156        self.dir.join(format!("{id}.{INTAKE_EXT}"))
157    }
158
159    fn next_id(&self) -> String {
160        let secs = SystemTime::now()
161            .duration_since(UNIX_EPOCH)
162            .map(|d| d.as_secs())
163            .unwrap_or(0);
164        let n = self.counter.fetch_add(1, Ordering::SeqCst);
165        format!("{secs}-{}-{n}", std::process::id())
166    }
167
168    /// Record a new pending intake and return it with its stamped id. This is the
169    /// agent-facing entry point: it is **non-blocking** (unlike the approval
170    /// broker's `confirm`) and persists only address/metadata — no value.
171    pub fn create(&self, mut intake: PendingIntake) -> Result<PendingIntake, CoreError> {
172        intake.id = self.next_id();
173        intake.created_unix = SystemTime::now()
174            .duration_since(UNIX_EPOCH)
175            .map(|d| d.as_secs())
176            .unwrap_or(0);
177        store::ensure_dir(&self.dir)?;
178        let bytes = serde_json::to_vec(&intake)
179            .map_err(|e| CoreError::Io(format!("serialize intake: {e}")))?;
180        atomic_write(&self.intake_path(&intake.id), &bytes)?;
181        Ok(intake)
182    }
183
184    /// Enumerate the open pending intakes, sorted by id (feeds `kovra intake list`
185    /// and the menu-bar badge).
186    pub fn list_pending(&self) -> Result<Vec<PendingIntake>, CoreError> {
187        let mut out = Vec::new();
188        if !self.dir.exists() {
189            return Ok(out);
190        }
191        let entries =
192            fs::read_dir(&self.dir).map_err(|e| CoreError::Io(format!("read intake dir: {e}")))?;
193        for entry in entries {
194            let path = entry
195                .map_err(|e| CoreError::Io(format!("intake entry: {e}")))?
196                .path();
197            if path.extension().and_then(|e| e.to_str()) != Some(INTAKE_EXT) {
198                continue;
199            }
200            let bytes = match fs::read(&path) {
201                Ok(b) => b,
202                Err(_) => continue, // racing cleanup — skip
203            };
204            if let Ok(intake) = serde_json::from_slice::<PendingIntake>(&bytes) {
205                out.push(intake);
206            }
207        }
208        out.sort_by(|a, b| a.id.cmp(&b.id));
209        Ok(out)
210    }
211
212    /// Read a single pending intake by id (the fulfillment surface re-reads the
213    /// authoritative record before prompting). `None` if no such intake is open.
214    pub fn get(&self, id: &str) -> Result<Option<PendingIntake>, CoreError> {
215        match fs::read(self.intake_path(id)) {
216            Ok(bytes) => serde_json::from_slice::<PendingIntake>(&bytes)
217                .map(Some)
218                .map_err(|e| CoreError::Io(format!("parse intake: {e}"))),
219            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
220            Err(e) => Err(CoreError::Io(format!("read intake: {e}"))),
221        }
222    }
223
224    /// Remove a pending intake (after it is fulfilled, or to cancel it). Returns
225    /// `false` if no such intake was open.
226    pub fn cancel(&self, id: &str) -> Result<bool, CoreError> {
227        match fs::remove_file(self.intake_path(id)) {
228            Ok(()) => Ok(true),
229            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
230            Err(e) => Err(CoreError::Io(format!("remove intake: {e}"))),
231        }
232    }
233}
234
235/// Write `bytes` to `path` atomically (temp → rename), `0600` on Unix — mirrors
236/// the approval broker's `atomic_write`, reusing `store::restrict` (the single
237/// owner of kovra's on-disk permission policy).
238fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), CoreError> {
239    let tmp = path.with_extension("tmp");
240    fs::write(&tmp, bytes).map_err(|e| CoreError::Io(format!("write intake: {e}")))?;
241    store::restrict(&tmp, 0o600)?;
242    fs::rename(&tmp, path).map_err(|e| CoreError::Io(format!("rename intake: {e}")))?;
243    Ok(())
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    fn broker(dir: &Path) -> IntakeBroker {
251        IntakeBroker::new(dir.join(INTAKE_DIR))
252    }
253
254    fn sample() -> PendingIntake {
255        PendingIntake::new("prod/db/password", Sensitivity::High, "prod", Origin::Agent)
256            .with_requesting_process("node (pid 1234)")
257            .with_description("rotating the prod DB password")
258    }
259
260    // I11/I14 — the persisted intake carries the address/metadata but NEVER a
261    // value: there is no `value` field in the model, and none appears on disk.
262    #[test]
263    fn persisted_intake_has_no_value_field() {
264        let tmp = tempfile::tempdir().unwrap();
265        let b = broker(tmp.path());
266        let stored = b.create(sample()).unwrap();
267
268        let raw = fs::read_to_string(b.intake_path(&stored.id)).unwrap();
269        let json: serde_json::Value = serde_json::from_str(&raw).unwrap();
270        let obj = json.as_object().unwrap();
271        assert!(
272            !obj.contains_key("value"),
273            "intake must never persist a value (I11/I12/I14): {raw}"
274        );
275        // The authoritative address/metadata IS present.
276        assert_eq!(obj["coordinate"], "prod/db/password");
277        assert_eq!(obj["environment"], "prod");
278    }
279
280    // KOV-69 (project wiring) — an agent-named destination project round-trips
281    // through create → list/get, and an intake written before the field existed
282    // (no `project` key) still deserializes (serde default ⇒ None). The project is
283    // an address hint (routes the destination vault + preselects the fulfill
284    // dropdown), never a value.
285    #[test]
286    fn project_roundtrips_and_is_serde_backward_compatible() {
287        let tmp = tempfile::tempdir().unwrap();
288        let b = broker(tmp.path());
289
290        // With a project: it survives persistence and is read back.
291        let stored = b
292            .create(sample().with_project(Some("payments".to_string())))
293            .unwrap();
294        assert_eq!(stored.project.as_deref(), Some("payments"));
295        assert_eq!(
296            b.get(&stored.id).unwrap().unwrap().project.as_deref(),
297            Some("payments")
298        );
299        // An empty project name normalises to None (global vault).
300        assert_eq!(
301            sample().with_project(Some(String::new())).project,
302            None,
303            "empty project ⇒ global (None)"
304        );
305
306        // Backward compat: an older intake JSON with no `project` key still loads.
307        let legacy = r#"{"id":"1-2-0","created_unix":0,"coordinate":"dev/app/token",
308            "sensitivity":"medium","environment":"dev","origin":"agent",
309            "requesting_process":null,"description":null}"#;
310        let parsed: PendingIntake = serde_json::from_str(legacy).unwrap();
311        assert_eq!(parsed.project, None);
312    }
313
314    // I12 — the intake file is owner-only (0600) on Unix, like the approval broker.
315    #[cfg(unix)]
316    #[test]
317    fn intake_file_is_owner_only() {
318        use std::os::unix::fs::PermissionsExt;
319        let tmp = tempfile::tempdir().unwrap();
320        let b = broker(tmp.path());
321        let stored = b.create(sample()).unwrap();
322        let mode = fs::metadata(b.intake_path(&stored.id))
323            .unwrap()
324            .permissions()
325            .mode()
326            & 0o777;
327        assert_eq!(mode, 0o600, "intake file must be 0600");
328    }
329
330    // create is non-blocking and list_pending surfaces the authoritative request;
331    // the agent-supplied description is fenced as Untrusted, never authoritative.
332    #[test]
333    fn create_is_non_blocking_and_lists_authoritative_metadata() {
334        let tmp = tempfile::tempdir().unwrap();
335        let b = broker(tmp.path());
336        // create returns immediately (no Confirmer, no block) with a stamped id.
337        let stored = b.create(sample()).unwrap();
338        assert!(!stored.id.is_empty());
339
340        let pending = b.list_pending().unwrap();
341        assert_eq!(pending.len(), 1);
342        let p = &pending[0];
343        assert_eq!(p.coordinate, "prod/db/password");
344        assert_eq!(p.sensitivity, Sensitivity::High);
345        assert_eq!(p.environment, "prod");
346        assert_eq!(p.requesting_process.as_deref(), Some("node (pid 1234)"));
347        // The description is segregated as untrusted.
348        let desc = p.description.as_ref().unwrap();
349        assert!(format!("{desc}").contains("untrusted"));
350    }
351
352    // get re-reads the authoritative record; cancel removes it (the fulfillment
353    // surface cancels after sealing). cancel of an unknown id is false.
354    #[test]
355    fn get_then_cancel_roundtrips() {
356        let tmp = tempfile::tempdir().unwrap();
357        let b = broker(tmp.path());
358        let stored = b.create(sample()).unwrap();
359
360        let got = b.get(&stored.id).unwrap().unwrap();
361        assert_eq!(got, stored);
362
363        assert!(b.cancel(&stored.id).unwrap());
364        assert!(b.get(&stored.id).unwrap().is_none());
365        assert!(b.list_pending().unwrap().is_empty());
366        // cancelling again (or an unknown id) is a no-op false, not an error.
367        assert!(!b.cancel(&stored.id).unwrap());
368        assert!(!b.cancel("no-such-id").unwrap());
369    }
370
371    // Anti-phishing / human-summoned-only (structural): the broker has no API that
372    // both creates an intake AND yields a value or opens a capture surface. The
373    // only product of create() is a PendingIntake of pure metadata; the value can
374    // only arrive out-of-band at the fulfillment layer. This test pins that the
375    // created record is value-free and that completing an intake is a separate,
376    // human-driven step (cancel after an out-of-band seal), not a broker method.
377    #[test]
378    fn broker_never_couples_creation_to_value_capture() {
379        let tmp = tempfile::tempdir().unwrap();
380        let b = broker(tmp.path());
381        let stored = b.create(sample()).unwrap();
382        // The returned record is pure metadata — there is no value to leak.
383        let serialized = serde_json::to_string(&stored).unwrap();
384        assert!(!serialized.contains("value"));
385        // Nothing was sealed by create(): the intake is still pending until a
386        // human fulfills it out-of-band.
387        assert_eq!(b.list_pending().unwrap().len(), 1);
388    }
389
390    // list_pending tolerates a non-existent dir and non-json clutter.
391    #[test]
392    fn list_pending_is_empty_when_no_dir() {
393        let tmp = tempfile::tempdir().unwrap();
394        let b = broker(tmp.path());
395        assert!(b.list_pending().unwrap().is_empty());
396    }
397}