Skip to main content

kovra_core/
intake.rs

1//! Intake broker (KOV-69a) — the **inversion** of the approval broker
2//! ([`crate::file_confirm`]).
3//!
4//! The approval broker answers *"an agent asked to USE a secret — may it?"*: the
5//! requester blocks until a human approves in another process. The intake broker
6//! answers the mirror question *"an agent asked to CREATE a secret — supply the
7//! value"*: the agent's request is **non-blocking** and the value arrives later,
8//! out-of-band, from a human-summoned fulfillment surface (CLI `kovra intake`,
9//! the menu-bar app in KOV-69b).
10//!
11//! The point is the security boundary: the agent (via the MCP `request_secret`
12//! tool) only ever names the **coordinate/sensitivity/environment** — never a
13//! value. The value goes human → vault directly and never enters model context
14//! (I11/I14). A [`PendingIntake`] therefore carries only authoritative
15//! address/metadata, exactly like [`crate::file_confirm::PendingRequest`] — no
16//! value field exists (I12). Files are `0600` under a `0700` dir, written
17//! atomically (temp → rename).
18//!
19//! Sealing is **not** done here: this broker owns only the pending-intake
20//! lifecycle (create / list / get / cancel) and has no vault coupling. The
21//! fulfillment surface re-reads the pending intake, runs the attended
22//! [`crate::confirm::Confirmer`] gate (I16), seals the human-supplied value
23//! through the existing `set` path (born per I5), and then [`cancel`]s the
24//! intake. Keeping the broker pure keeps its invariants legible and unit-testable
25//! without a vault.
26//!
27//! Anti-phishing (KOV-69, structural): there is **no** method here that both
28//! creates an intake and opens/reveals a value-capture surface. [`create`] only
29//! writes a file; the capture surface is opened solely by a human action in the
30//! fulfillment layer. The per-session identity nonce that the menu-bar surface
31//! cross-checks (KOV-69b) is a property of that *surface*, not of an
32//! agent-created record, so it deliberately does not live on [`PendingIntake`].
33//!
34//! [`create`]: IntakeBroker::create
35//! [`cancel`]: IntakeBroker::cancel
36
37use std::fs;
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::time::{SystemTime, UNIX_EPOCH};
41
42use serde::{Deserialize, Serialize};
43
44use crate::confirm::Untrusted;
45use crate::error::CoreError;
46use crate::scope::Origin;
47use crate::sensitivity::Sensitivity;
48use crate::store;
49
50/// The conventional pending-intake subdirectory under the registry root.
51pub const INTAKE_DIR: &str = "intake";
52
53const INTAKE_EXT: &str = "json";
54
55/// A pending agent-initiated secret-creation request, persisted for the
56/// human-summoned fulfillment surface (CLI `kovra intake list`, the menu-bar app).
57///
58/// Carries only the authoritative address/metadata the agent named — **never a
59/// value** (I11/I12/I14). There is no `value` field by construction; the value is
60/// supplied out-of-band at fulfillment and goes straight to the vault.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct PendingIntake {
63    /// The operator-typable intake id (`<unix>-<pid>-<n>`, same scheme as the
64    /// approval broker).
65    pub id: String,
66    /// Seconds since the Unix epoch when the intake was created.
67    pub created_unix: u64,
68    /// The coordinate the value will be sealed under (an address, not a value).
69    pub coordinate: String,
70    /// Sensitivity the secret will be born with (a `prod` secret is born `high`,
71    /// I5 — enforced at the seal, recorded here for the fulfillment prompt).
72    pub sensitivity: Sensitivity,
73    /// Environment (`prod` is highlighted by the renderer).
74    pub environment: String,
75    /// Who initiated the intake — weighs into the human's decision.
76    pub origin: Origin,
77    /// The requesting process / caller identity, a **trusted, observed fact** set
78    /// by the FFI/CLI boundary (never from requester text). Carries no value.
79    pub requesting_process: Option<String>,
80    /// Optional requester free-text, segregated as untrusted (I16) — the agent's
81    /// "why", never the authoritative line.
82    pub description: Option<Untrusted>,
83}
84
85impl PendingIntake {
86    /// Build an intake request from the authoritative fields. `id`/`created_unix`
87    /// are placeholders until [`IntakeBroker::create`] stamps them.
88    pub fn new(
89        coordinate: impl Into<String>,
90        sensitivity: Sensitivity,
91        environment: impl Into<String>,
92        origin: Origin,
93    ) -> Self {
94        Self {
95            id: String::new(),
96            created_unix: 0,
97            coordinate: coordinate.into(),
98            sensitivity,
99            environment: environment.into(),
100            origin,
101            requesting_process: None,
102            description: None,
103        }
104    }
105
106    /// Attach the **trusted, observed** requesting-process identity (mirrors
107    /// [`crate::confirm::ConfirmRequest::with_requesting_process`]).
108    pub fn with_requesting_process(mut self, s: impl Into<String>) -> Self {
109        self.requesting_process = Some(s.into());
110        self
111    }
112
113    /// Attach segregated, untrusted requester free-text.
114    pub fn with_description(mut self, text: impl Into<String>) -> Self {
115        self.description = Some(Untrusted(text.into()));
116        self
117    }
118}
119
120/// A file-backed queue of pending intakes under `<root>/intake/`.
121pub struct IntakeBroker {
122    dir: PathBuf,
123    counter: AtomicU64,
124}
125
126impl IntakeBroker {
127    /// A broker writing under `dir` (the intake directory itself).
128    pub fn new(dir: impl Into<PathBuf>) -> Self {
129        Self {
130            dir: dir.into(),
131            counter: AtomicU64::new(0),
132        }
133    }
134
135    /// A broker rooted at a registry root: `<root>/intake/`.
136    pub fn under_root(root: &Path) -> Self {
137        Self::new(root.join(INTAKE_DIR))
138    }
139
140    fn intake_path(&self, id: &str) -> PathBuf {
141        self.dir.join(format!("{id}.{INTAKE_EXT}"))
142    }
143
144    fn next_id(&self) -> String {
145        let secs = SystemTime::now()
146            .duration_since(UNIX_EPOCH)
147            .map(|d| d.as_secs())
148            .unwrap_or(0);
149        let n = self.counter.fetch_add(1, Ordering::SeqCst);
150        format!("{secs}-{}-{n}", std::process::id())
151    }
152
153    /// Record a new pending intake and return it with its stamped id. This is the
154    /// agent-facing entry point: it is **non-blocking** (unlike the approval
155    /// broker's `confirm`) and persists only address/metadata — no value.
156    pub fn create(&self, mut intake: PendingIntake) -> Result<PendingIntake, CoreError> {
157        intake.id = self.next_id();
158        intake.created_unix = SystemTime::now()
159            .duration_since(UNIX_EPOCH)
160            .map(|d| d.as_secs())
161            .unwrap_or(0);
162        store::ensure_dir(&self.dir)?;
163        let bytes = serde_json::to_vec(&intake)
164            .map_err(|e| CoreError::Io(format!("serialize intake: {e}")))?;
165        atomic_write(&self.intake_path(&intake.id), &bytes)?;
166        Ok(intake)
167    }
168
169    /// Enumerate the open pending intakes, sorted by id (feeds `kovra intake list`
170    /// and the menu-bar badge).
171    pub fn list_pending(&self) -> Result<Vec<PendingIntake>, CoreError> {
172        let mut out = Vec::new();
173        if !self.dir.exists() {
174            return Ok(out);
175        }
176        let entries =
177            fs::read_dir(&self.dir).map_err(|e| CoreError::Io(format!("read intake dir: {e}")))?;
178        for entry in entries {
179            let path = entry
180                .map_err(|e| CoreError::Io(format!("intake entry: {e}")))?
181                .path();
182            if path.extension().and_then(|e| e.to_str()) != Some(INTAKE_EXT) {
183                continue;
184            }
185            let bytes = match fs::read(&path) {
186                Ok(b) => b,
187                Err(_) => continue, // racing cleanup — skip
188            };
189            if let Ok(intake) = serde_json::from_slice::<PendingIntake>(&bytes) {
190                out.push(intake);
191            }
192        }
193        out.sort_by(|a, b| a.id.cmp(&b.id));
194        Ok(out)
195    }
196
197    /// Read a single pending intake by id (the fulfillment surface re-reads the
198    /// authoritative record before prompting). `None` if no such intake is open.
199    pub fn get(&self, id: &str) -> Result<Option<PendingIntake>, CoreError> {
200        match fs::read(self.intake_path(id)) {
201            Ok(bytes) => serde_json::from_slice::<PendingIntake>(&bytes)
202                .map(Some)
203                .map_err(|e| CoreError::Io(format!("parse intake: {e}"))),
204            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
205            Err(e) => Err(CoreError::Io(format!("read intake: {e}"))),
206        }
207    }
208
209    /// Remove a pending intake (after it is fulfilled, or to cancel it). Returns
210    /// `false` if no such intake was open.
211    pub fn cancel(&self, id: &str) -> Result<bool, CoreError> {
212        match fs::remove_file(self.intake_path(id)) {
213            Ok(()) => Ok(true),
214            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
215            Err(e) => Err(CoreError::Io(format!("remove intake: {e}"))),
216        }
217    }
218}
219
220/// Write `bytes` to `path` atomically (temp → rename), `0600` on Unix — mirrors
221/// the approval broker's `atomic_write`, reusing `store::restrict` (the single
222/// owner of kovra's on-disk permission policy).
223fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), CoreError> {
224    let tmp = path.with_extension("tmp");
225    fs::write(&tmp, bytes).map_err(|e| CoreError::Io(format!("write intake: {e}")))?;
226    store::restrict(&tmp, 0o600)?;
227    fs::rename(&tmp, path).map_err(|e| CoreError::Io(format!("rename intake: {e}")))?;
228    Ok(())
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    fn broker(dir: &Path) -> IntakeBroker {
236        IntakeBroker::new(dir.join(INTAKE_DIR))
237    }
238
239    fn sample() -> PendingIntake {
240        PendingIntake::new("prod/db/password", Sensitivity::High, "prod", Origin::Agent)
241            .with_requesting_process("node (pid 1234)")
242            .with_description("rotating the prod DB password")
243    }
244
245    // I11/I14 — the persisted intake carries the address/metadata but NEVER a
246    // value: there is no `value` field in the model, and none appears on disk.
247    #[test]
248    fn persisted_intake_has_no_value_field() {
249        let tmp = tempfile::tempdir().unwrap();
250        let b = broker(tmp.path());
251        let stored = b.create(sample()).unwrap();
252
253        let raw = fs::read_to_string(b.intake_path(&stored.id)).unwrap();
254        let json: serde_json::Value = serde_json::from_str(&raw).unwrap();
255        let obj = json.as_object().unwrap();
256        assert!(
257            !obj.contains_key("value"),
258            "intake must never persist a value (I11/I12/I14): {raw}"
259        );
260        // The authoritative address/metadata IS present.
261        assert_eq!(obj["coordinate"], "prod/db/password");
262        assert_eq!(obj["environment"], "prod");
263    }
264
265    // I12 — the intake file is owner-only (0600) on Unix, like the approval broker.
266    #[cfg(unix)]
267    #[test]
268    fn intake_file_is_owner_only() {
269        use std::os::unix::fs::PermissionsExt;
270        let tmp = tempfile::tempdir().unwrap();
271        let b = broker(tmp.path());
272        let stored = b.create(sample()).unwrap();
273        let mode = fs::metadata(b.intake_path(&stored.id))
274            .unwrap()
275            .permissions()
276            .mode()
277            & 0o777;
278        assert_eq!(mode, 0o600, "intake file must be 0600");
279    }
280
281    // create is non-blocking and list_pending surfaces the authoritative request;
282    // the agent-supplied description is fenced as Untrusted, never authoritative.
283    #[test]
284    fn create_is_non_blocking_and_lists_authoritative_metadata() {
285        let tmp = tempfile::tempdir().unwrap();
286        let b = broker(tmp.path());
287        // create returns immediately (no Confirmer, no block) with a stamped id.
288        let stored = b.create(sample()).unwrap();
289        assert!(!stored.id.is_empty());
290
291        let pending = b.list_pending().unwrap();
292        assert_eq!(pending.len(), 1);
293        let p = &pending[0];
294        assert_eq!(p.coordinate, "prod/db/password");
295        assert_eq!(p.sensitivity, Sensitivity::High);
296        assert_eq!(p.environment, "prod");
297        assert_eq!(p.requesting_process.as_deref(), Some("node (pid 1234)"));
298        // The description is segregated as untrusted.
299        let desc = p.description.as_ref().unwrap();
300        assert!(format!("{desc}").contains("untrusted"));
301    }
302
303    // get re-reads the authoritative record; cancel removes it (the fulfillment
304    // surface cancels after sealing). cancel of an unknown id is false.
305    #[test]
306    fn get_then_cancel_roundtrips() {
307        let tmp = tempfile::tempdir().unwrap();
308        let b = broker(tmp.path());
309        let stored = b.create(sample()).unwrap();
310
311        let got = b.get(&stored.id).unwrap().unwrap();
312        assert_eq!(got, stored);
313
314        assert!(b.cancel(&stored.id).unwrap());
315        assert!(b.get(&stored.id).unwrap().is_none());
316        assert!(b.list_pending().unwrap().is_empty());
317        // cancelling again (or an unknown id) is a no-op false, not an error.
318        assert!(!b.cancel(&stored.id).unwrap());
319        assert!(!b.cancel("no-such-id").unwrap());
320    }
321
322    // Anti-phishing / human-summoned-only (structural): the broker has no API that
323    // both creates an intake AND yields a value or opens a capture surface. The
324    // only product of create() is a PendingIntake of pure metadata; the value can
325    // only arrive out-of-band at the fulfillment layer. This test pins that the
326    // created record is value-free and that completing an intake is a separate,
327    // human-driven step (cancel after an out-of-band seal), not a broker method.
328    #[test]
329    fn broker_never_couples_creation_to_value_capture() {
330        let tmp = tempfile::tempdir().unwrap();
331        let b = broker(tmp.path());
332        let stored = b.create(sample()).unwrap();
333        // The returned record is pure metadata — there is no value to leak.
334        let serialized = serde_json::to_string(&stored).unwrap();
335        assert!(!serialized.contains("value"));
336        // Nothing was sealed by create(): the intake is still pending until a
337        // human fulfills it out-of-band.
338        assert_eq!(b.list_pending().unwrap().len(), 1);
339    }
340
341    // list_pending tolerates a non-existent dir and non-json clutter.
342    #[test]
343    fn list_pending_is_empty_when_no_dir() {
344        let tmp = tempfile::tempdir().unwrap();
345        let b = broker(tmp.path());
346        assert!(b.list_pending().unwrap().is_empty());
347    }
348}