kovra_core/intake.rs
1//! Intake broker (KOV-69a) — the **inversion** of the approval broker
2//! ([`crate::file_confirm`]).
3//!
4//! The approval broker answers *"an agent asked to USE a secret — may it?"*: the
5//! requester blocks until a human approves in another process. The intake broker
6//! answers the mirror question *"an agent asked to CREATE a secret — supply the
7//! value"*: the agent's request is **non-blocking** and the value arrives later,
8//! out-of-band, from a human-summoned fulfillment surface (CLI `kovra intake`,
9//! the menu-bar app in KOV-69b).
10//!
11//! The point is the security boundary: the agent (via the MCP `request_secret`
12//! tool) only ever names the **coordinate/sensitivity/environment** — never a
13//! value. The value goes human → vault directly and never enters model context
14//! (I11/I14). A [`PendingIntake`] therefore carries only authoritative
15//! address/metadata, exactly like [`crate::file_confirm::PendingRequest`] — no
16//! value field exists (I12). Files are `0600` under a `0700` dir, written
17//! atomically (temp → rename).
18//!
19//! Sealing is **not** done here: this broker owns only the pending-intake
20//! lifecycle (create / list / get / cancel) and has no vault coupling. The
21//! fulfillment surface re-reads the pending intake, runs the attended
22//! [`crate::confirm::Confirmer`] gate (I16), seals the human-supplied value
23//! through the existing `set` path (born per I5), and then [`cancel`]s the
24//! intake. Keeping the broker pure keeps its invariants legible and unit-testable
25//! without a vault.
26//!
27//! Anti-phishing (KOV-69, structural): there is **no** method here that both
28//! creates an intake and opens/reveals a value-capture surface. [`create`] only
29//! writes a file; the capture surface is opened solely by a human action in the
30//! fulfillment layer. The per-session identity nonce that the menu-bar surface
31//! cross-checks (KOV-69b) is a property of that *surface*, not of an
32//! agent-created record, so it deliberately does not live on [`PendingIntake`].
33//!
34//! [`create`]: IntakeBroker::create
35//! [`cancel`]: IntakeBroker::cancel
36
37use std::fs;
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::time::{SystemTime, UNIX_EPOCH};
41
42use serde::{Deserialize, Serialize};
43
44use crate::confirm::Untrusted;
45use crate::error::CoreError;
46use crate::scope::Origin;
47use crate::sensitivity::Sensitivity;
48use crate::store;
49
50/// The conventional pending-intake subdirectory under the registry root.
51pub const INTAKE_DIR: &str = "intake";
52
53const INTAKE_EXT: &str = "json";
54
55/// A pending agent-initiated secret-creation request, persisted for the
56/// human-summoned fulfillment surface (CLI `kovra intake list`, the menu-bar app).
57///
58/// Carries only the authoritative address/metadata the agent named — **never a
59/// value** (I11/I12/I14). There is no `value` field by construction; the value is
60/// supplied out-of-band at fulfillment and goes straight to the vault.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct PendingIntake {
63 /// The operator-typable intake id (`<unix>-<pid>-<n>`, same scheme as the
64 /// approval broker).
65 pub id: String,
66 /// Seconds since the Unix epoch when the intake was created.
67 pub created_unix: u64,
68 /// The coordinate the value will be sealed under (an address, not a value).
69 pub coordinate: String,
70 /// Sensitivity the secret will be born with (a `prod` secret is born `high`,
71 /// I5 — enforced at the seal, recorded here for the fulfillment prompt).
72 pub sensitivity: Sensitivity,
73 /// Environment (`prod` is highlighted by the renderer).
74 pub environment: String,
75 /// Who initiated the intake — weighs into the human's decision.
76 pub origin: Origin,
77 /// The requesting process / caller identity, a **trusted, observed fact** set
78 /// by the FFI/CLI boundary (never from requester text). Carries no value.
79 pub requesting_process: Option<String>,
80 /// Optional requester free-text, segregated as untrusted (I16) — the agent's
81 /// "why", never the authoritative line.
82 pub description: Option<Untrusted>,
83 /// Optional destination **project vault** the value should be sealed into
84 /// (`None` ⇒ the global vault). An address hint, never a value — it just
85 /// routes which vault the fulfillment seals into and preselects the project
86 /// in the native fulfill window. `#[serde(default)]` keeps intakes written
87 /// before this field deserializable.
88 #[serde(default)]
89 pub project: Option<String>,
90}
91
92impl PendingIntake {
93 /// Build an intake request from the authoritative fields. `id`/`created_unix`
94 /// are placeholders until [`IntakeBroker::create`] stamps them.
95 pub fn new(
96 coordinate: impl Into<String>,
97 sensitivity: Sensitivity,
98 environment: impl Into<String>,
99 origin: Origin,
100 ) -> Self {
101 Self {
102 id: String::new(),
103 created_unix: 0,
104 coordinate: coordinate.into(),
105 sensitivity,
106 environment: environment.into(),
107 origin,
108 requesting_process: None,
109 description: None,
110 project: None,
111 }
112 }
113
114 /// Attach the **trusted, observed** requesting-process identity (mirrors
115 /// [`crate::confirm::ConfirmRequest::with_requesting_process`]).
116 pub fn with_requesting_process(mut self, s: impl Into<String>) -> Self {
117 self.requesting_process = Some(s.into());
118 self
119 }
120
121 /// Attach segregated, untrusted requester free-text.
122 pub fn with_description(mut self, text: impl Into<String>) -> Self {
123 self.description = Some(Untrusted(text.into()));
124 self
125 }
126
127 /// Attach the destination project vault (`None` ⇒ global). Routes which vault
128 /// the fulfillment seals into and preselects it in the fulfill window.
129 pub fn with_project(mut self, project: Option<String>) -> Self {
130 self.project = project.filter(|p| !p.is_empty());
131 self
132 }
133}
134
135/// A file-backed queue of pending intakes under `<root>/intake/`.
136pub struct IntakeBroker {
137 dir: PathBuf,
138 counter: AtomicU64,
139}
140
141impl IntakeBroker {
142 /// A broker writing under `dir` (the intake directory itself).
143 pub fn new(dir: impl Into<PathBuf>) -> Self {
144 Self {
145 dir: dir.into(),
146 counter: AtomicU64::new(0),
147 }
148 }
149
150 /// A broker rooted at a registry root: `<root>/intake/`.
151 pub fn under_root(root: &Path) -> Self {
152 Self::new(root.join(INTAKE_DIR))
153 }
154
155 fn intake_path(&self, id: &str) -> PathBuf {
156 self.dir.join(format!("{id}.{INTAKE_EXT}"))
157 }
158
159 fn next_id(&self) -> String {
160 let secs = SystemTime::now()
161 .duration_since(UNIX_EPOCH)
162 .map(|d| d.as_secs())
163 .unwrap_or(0);
164 let n = self.counter.fetch_add(1, Ordering::SeqCst);
165 format!("{secs}-{}-{n}", std::process::id())
166 }
167
168 /// Record a new pending intake and return it with its stamped id. This is the
169 /// agent-facing entry point: it is **non-blocking** (unlike the approval
170 /// broker's `confirm`) and persists only address/metadata — no value.
171 pub fn create(&self, mut intake: PendingIntake) -> Result<PendingIntake, CoreError> {
172 intake.id = self.next_id();
173 intake.created_unix = SystemTime::now()
174 .duration_since(UNIX_EPOCH)
175 .map(|d| d.as_secs())
176 .unwrap_or(0);
177 store::ensure_dir(&self.dir)?;
178 let bytes = serde_json::to_vec(&intake)
179 .map_err(|e| CoreError::Io(format!("serialize intake: {e}")))?;
180 atomic_write(&self.intake_path(&intake.id), &bytes)?;
181 Ok(intake)
182 }
183
184 /// Enumerate the open pending intakes, sorted by id (feeds `kovra intake list`
185 /// and the menu-bar badge).
186 pub fn list_pending(&self) -> Result<Vec<PendingIntake>, CoreError> {
187 let mut out = Vec::new();
188 if !self.dir.exists() {
189 return Ok(out);
190 }
191 let entries =
192 fs::read_dir(&self.dir).map_err(|e| CoreError::Io(format!("read intake dir: {e}")))?;
193 for entry in entries {
194 let path = entry
195 .map_err(|e| CoreError::Io(format!("intake entry: {e}")))?
196 .path();
197 if path.extension().and_then(|e| e.to_str()) != Some(INTAKE_EXT) {
198 continue;
199 }
200 let bytes = match fs::read(&path) {
201 Ok(b) => b,
202 Err(_) => continue, // racing cleanup — skip
203 };
204 if let Ok(intake) = serde_json::from_slice::<PendingIntake>(&bytes) {
205 out.push(intake);
206 }
207 }
208 out.sort_by(|a, b| a.id.cmp(&b.id));
209 Ok(out)
210 }
211
212 /// Read a single pending intake by id (the fulfillment surface re-reads the
213 /// authoritative record before prompting). `None` if no such intake is open.
214 pub fn get(&self, id: &str) -> Result<Option<PendingIntake>, CoreError> {
215 match fs::read(self.intake_path(id)) {
216 Ok(bytes) => serde_json::from_slice::<PendingIntake>(&bytes)
217 .map(Some)
218 .map_err(|e| CoreError::Io(format!("parse intake: {e}"))),
219 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
220 Err(e) => Err(CoreError::Io(format!("read intake: {e}"))),
221 }
222 }
223
224 /// Remove a pending intake (after it is fulfilled, or to cancel it). Returns
225 /// `false` if no such intake was open.
226 pub fn cancel(&self, id: &str) -> Result<bool, CoreError> {
227 match fs::remove_file(self.intake_path(id)) {
228 Ok(()) => Ok(true),
229 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
230 Err(e) => Err(CoreError::Io(format!("remove intake: {e}"))),
231 }
232 }
233}
234
235/// Write `bytes` to `path` atomically (temp → rename), `0600` on Unix — mirrors
236/// the approval broker's `atomic_write`, reusing `store::restrict` (the single
237/// owner of kovra's on-disk permission policy).
238fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), CoreError> {
239 let tmp = path.with_extension("tmp");
240 fs::write(&tmp, bytes).map_err(|e| CoreError::Io(format!("write intake: {e}")))?;
241 store::restrict(&tmp, 0o600)?;
242 fs::rename(&tmp, path).map_err(|e| CoreError::Io(format!("rename intake: {e}")))?;
243 Ok(())
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 fn broker(dir: &Path) -> IntakeBroker {
251 IntakeBroker::new(dir.join(INTAKE_DIR))
252 }
253
254 fn sample() -> PendingIntake {
255 PendingIntake::new("prod/db/password", Sensitivity::High, "prod", Origin::Agent)
256 .with_requesting_process("node (pid 1234)")
257 .with_description("rotating the prod DB password")
258 }
259
260 // I11/I14 — the persisted intake carries the address/metadata but NEVER a
261 // value: there is no `value` field in the model, and none appears on disk.
262 #[test]
263 fn persisted_intake_has_no_value_field() {
264 let tmp = tempfile::tempdir().unwrap();
265 let b = broker(tmp.path());
266 let stored = b.create(sample()).unwrap();
267
268 let raw = fs::read_to_string(b.intake_path(&stored.id)).unwrap();
269 let json: serde_json::Value = serde_json::from_str(&raw).unwrap();
270 let obj = json.as_object().unwrap();
271 assert!(
272 !obj.contains_key("value"),
273 "intake must never persist a value (I11/I12/I14): {raw}"
274 );
275 // The authoritative address/metadata IS present.
276 assert_eq!(obj["coordinate"], "prod/db/password");
277 assert_eq!(obj["environment"], "prod");
278 }
279
280 // KOV-69 (project wiring) — an agent-named destination project round-trips
281 // through create → list/get, and an intake written before the field existed
282 // (no `project` key) still deserializes (serde default ⇒ None). The project is
283 // an address hint (routes the destination vault + preselects the fulfill
284 // dropdown), never a value.
285 #[test]
286 fn project_roundtrips_and_is_serde_backward_compatible() {
287 let tmp = tempfile::tempdir().unwrap();
288 let b = broker(tmp.path());
289
290 // With a project: it survives persistence and is read back.
291 let stored = b
292 .create(sample().with_project(Some("payments".to_string())))
293 .unwrap();
294 assert_eq!(stored.project.as_deref(), Some("payments"));
295 assert_eq!(
296 b.get(&stored.id).unwrap().unwrap().project.as_deref(),
297 Some("payments")
298 );
299 // An empty project name normalises to None (global vault).
300 assert_eq!(
301 sample().with_project(Some(String::new())).project,
302 None,
303 "empty project ⇒ global (None)"
304 );
305
306 // Backward compat: an older intake JSON with no `project` key still loads.
307 let legacy = r#"{"id":"1-2-0","created_unix":0,"coordinate":"dev/app/token",
308 "sensitivity":"medium","environment":"dev","origin":"agent",
309 "requesting_process":null,"description":null}"#;
310 let parsed: PendingIntake = serde_json::from_str(legacy).unwrap();
311 assert_eq!(parsed.project, None);
312 }
313
314 // I12 — the intake file is owner-only (0600) on Unix, like the approval broker.
315 #[cfg(unix)]
316 #[test]
317 fn intake_file_is_owner_only() {
318 use std::os::unix::fs::PermissionsExt;
319 let tmp = tempfile::tempdir().unwrap();
320 let b = broker(tmp.path());
321 let stored = b.create(sample()).unwrap();
322 let mode = fs::metadata(b.intake_path(&stored.id))
323 .unwrap()
324 .permissions()
325 .mode()
326 & 0o777;
327 assert_eq!(mode, 0o600, "intake file must be 0600");
328 }
329
330 // create is non-blocking and list_pending surfaces the authoritative request;
331 // the agent-supplied description is fenced as Untrusted, never authoritative.
332 #[test]
333 fn create_is_non_blocking_and_lists_authoritative_metadata() {
334 let tmp = tempfile::tempdir().unwrap();
335 let b = broker(tmp.path());
336 // create returns immediately (no Confirmer, no block) with a stamped id.
337 let stored = b.create(sample()).unwrap();
338 assert!(!stored.id.is_empty());
339
340 let pending = b.list_pending().unwrap();
341 assert_eq!(pending.len(), 1);
342 let p = &pending[0];
343 assert_eq!(p.coordinate, "prod/db/password");
344 assert_eq!(p.sensitivity, Sensitivity::High);
345 assert_eq!(p.environment, "prod");
346 assert_eq!(p.requesting_process.as_deref(), Some("node (pid 1234)"));
347 // The description is segregated as untrusted.
348 let desc = p.description.as_ref().unwrap();
349 assert!(format!("{desc}").contains("untrusted"));
350 }
351
352 // get re-reads the authoritative record; cancel removes it (the fulfillment
353 // surface cancels after sealing). cancel of an unknown id is false.
354 #[test]
355 fn get_then_cancel_roundtrips() {
356 let tmp = tempfile::tempdir().unwrap();
357 let b = broker(tmp.path());
358 let stored = b.create(sample()).unwrap();
359
360 let got = b.get(&stored.id).unwrap().unwrap();
361 assert_eq!(got, stored);
362
363 assert!(b.cancel(&stored.id).unwrap());
364 assert!(b.get(&stored.id).unwrap().is_none());
365 assert!(b.list_pending().unwrap().is_empty());
366 // cancelling again (or an unknown id) is a no-op false, not an error.
367 assert!(!b.cancel(&stored.id).unwrap());
368 assert!(!b.cancel("no-such-id").unwrap());
369 }
370
371 // Anti-phishing / human-summoned-only (structural): the broker has no API that
372 // both creates an intake AND yields a value or opens a capture surface. The
373 // only product of create() is a PendingIntake of pure metadata; the value can
374 // only arrive out-of-band at the fulfillment layer. This test pins that the
375 // created record is value-free and that completing an intake is a separate,
376 // human-driven step (cancel after an out-of-band seal), not a broker method.
377 #[test]
378 fn broker_never_couples_creation_to_value_capture() {
379 let tmp = tempfile::tempdir().unwrap();
380 let b = broker(tmp.path());
381 let stored = b.create(sample()).unwrap();
382 // The returned record is pure metadata — there is no value to leak.
383 let serialized = serde_json::to_string(&stored).unwrap();
384 assert!(!serialized.contains("value"));
385 // Nothing was sealed by create(): the intake is still pending until a
386 // human fulfills it out-of-band.
387 assert_eq!(b.list_pending().unwrap().len(), 1);
388 }
389
390 // list_pending tolerates a non-existent dir and non-json clutter.
391 #[test]
392 fn list_pending_is_empty_when_no_dir() {
393 let tmp = tempfile::tempdir().unwrap();
394 let b = broker(tmp.path());
395 assert!(b.list_pending().unwrap().is_empty());
396 }
397}