kovra_core/intake.rs
1//! Intake broker (KOV-69a) — the **inversion** of the approval broker
2//! ([`crate::file_confirm`]).
3//!
4//! The approval broker answers *"an agent asked to USE a secret — may it?"*: the
5//! requester blocks until a human approves in another process. The intake broker
6//! answers the mirror question *"an agent asked to CREATE a secret — supply the
7//! value"*: the agent's request is **non-blocking** and the value arrives later,
8//! out-of-band, from a human-summoned fulfillment surface (CLI `kovra intake`,
9//! the menu-bar app in KOV-69b).
10//!
11//! The point is the security boundary: the agent (via the MCP `request_secret`
12//! tool) only ever names the **coordinate/sensitivity/environment** — never a
13//! value. The value goes human → vault directly and never enters model context
14//! (I11/I14). A [`PendingIntake`] therefore carries only authoritative
15//! address/metadata, exactly like [`crate::file_confirm::PendingRequest`] — no
16//! value field exists (I12). Files are `0600` under a `0700` dir, written
17//! atomically (temp → rename).
18//!
19//! Sealing is **not** done here: this broker owns only the pending-intake
20//! lifecycle (create / list / get / cancel) and has no vault coupling. The
21//! fulfillment surface re-reads the pending intake, runs the attended
22//! [`crate::confirm::Confirmer`] gate (I16), seals the human-supplied value
23//! through the existing `set` path (born per I5), and then [`cancel`]s the
24//! intake. Keeping the broker pure keeps its invariants legible and unit-testable
25//! without a vault.
26//!
27//! Anti-phishing (KOV-69, structural): there is **no** method here that both
28//! creates an intake and opens/reveals a value-capture surface. [`create`] only
29//! writes a file; the capture surface is opened solely by a human action in the
30//! fulfillment layer. The per-session identity nonce that the menu-bar surface
31//! cross-checks (KOV-69b) is a property of that *surface*, not of an
32//! agent-created record, so it deliberately does not live on [`PendingIntake`].
33//!
34//! [`create`]: IntakeBroker::create
35//! [`cancel`]: IntakeBroker::cancel
36
37use std::fs;
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::time::{SystemTime, UNIX_EPOCH};
41
42use serde::{Deserialize, Serialize};
43
44use crate::confirm::Untrusted;
45use crate::error::CoreError;
46use crate::scope::Origin;
47use crate::sensitivity::Sensitivity;
48use crate::store;
49
50/// The conventional pending-intake subdirectory under the registry root.
51pub const INTAKE_DIR: &str = "intake";
52
53const INTAKE_EXT: &str = "json";
54
55/// A pending agent-initiated secret-creation request, persisted for the
56/// human-summoned fulfillment surface (CLI `kovra intake list`, the menu-bar app).
57///
58/// Carries only the authoritative address/metadata the agent named — **never a
59/// value** (I11/I12/I14). There is no `value` field by construction; the value is
60/// supplied out-of-band at fulfillment and goes straight to the vault.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct PendingIntake {
63 /// The operator-typable intake id (`<unix>-<pid>-<n>`, same scheme as the
64 /// approval broker).
65 pub id: String,
66 /// Seconds since the Unix epoch when the intake was created.
67 pub created_unix: u64,
68 /// The coordinate the value will be sealed under (an address, not a value).
69 pub coordinate: String,
70 /// Sensitivity the secret will be born with (a `prod` secret is born `high`,
71 /// I5 — enforced at the seal, recorded here for the fulfillment prompt).
72 pub sensitivity: Sensitivity,
73 /// Environment (`prod` is highlighted by the renderer).
74 pub environment: String,
75 /// Who initiated the intake — weighs into the human's decision.
76 pub origin: Origin,
77 /// The requesting process / caller identity, a **trusted, observed fact** set
78 /// by the FFI/CLI boundary (never from requester text). Carries no value.
79 pub requesting_process: Option<String>,
80 /// Optional requester free-text, segregated as untrusted (I16) — the agent's
81 /// "why", never the authoritative line.
82 pub description: Option<Untrusted>,
83}
84
85impl PendingIntake {
86 /// Build an intake request from the authoritative fields. `id`/`created_unix`
87 /// are placeholders until [`IntakeBroker::create`] stamps them.
88 pub fn new(
89 coordinate: impl Into<String>,
90 sensitivity: Sensitivity,
91 environment: impl Into<String>,
92 origin: Origin,
93 ) -> Self {
94 Self {
95 id: String::new(),
96 created_unix: 0,
97 coordinate: coordinate.into(),
98 sensitivity,
99 environment: environment.into(),
100 origin,
101 requesting_process: None,
102 description: None,
103 }
104 }
105
106 /// Attach the **trusted, observed** requesting-process identity (mirrors
107 /// [`crate::confirm::ConfirmRequest::with_requesting_process`]).
108 pub fn with_requesting_process(mut self, s: impl Into<String>) -> Self {
109 self.requesting_process = Some(s.into());
110 self
111 }
112
113 /// Attach segregated, untrusted requester free-text.
114 pub fn with_description(mut self, text: impl Into<String>) -> Self {
115 self.description = Some(Untrusted(text.into()));
116 self
117 }
118}
119
120/// A file-backed queue of pending intakes under `<root>/intake/`.
121pub struct IntakeBroker {
122 dir: PathBuf,
123 counter: AtomicU64,
124}
125
126impl IntakeBroker {
127 /// A broker writing under `dir` (the intake directory itself).
128 pub fn new(dir: impl Into<PathBuf>) -> Self {
129 Self {
130 dir: dir.into(),
131 counter: AtomicU64::new(0),
132 }
133 }
134
135 /// A broker rooted at a registry root: `<root>/intake/`.
136 pub fn under_root(root: &Path) -> Self {
137 Self::new(root.join(INTAKE_DIR))
138 }
139
140 fn intake_path(&self, id: &str) -> PathBuf {
141 self.dir.join(format!("{id}.{INTAKE_EXT}"))
142 }
143
144 fn next_id(&self) -> String {
145 let secs = SystemTime::now()
146 .duration_since(UNIX_EPOCH)
147 .map(|d| d.as_secs())
148 .unwrap_or(0);
149 let n = self.counter.fetch_add(1, Ordering::SeqCst);
150 format!("{secs}-{}-{n}", std::process::id())
151 }
152
153 /// Record a new pending intake and return it with its stamped id. This is the
154 /// agent-facing entry point: it is **non-blocking** (unlike the approval
155 /// broker's `confirm`) and persists only address/metadata — no value.
156 pub fn create(&self, mut intake: PendingIntake) -> Result<PendingIntake, CoreError> {
157 intake.id = self.next_id();
158 intake.created_unix = SystemTime::now()
159 .duration_since(UNIX_EPOCH)
160 .map(|d| d.as_secs())
161 .unwrap_or(0);
162 store::ensure_dir(&self.dir)?;
163 let bytes = serde_json::to_vec(&intake)
164 .map_err(|e| CoreError::Io(format!("serialize intake: {e}")))?;
165 atomic_write(&self.intake_path(&intake.id), &bytes)?;
166 Ok(intake)
167 }
168
169 /// Enumerate the open pending intakes, sorted by id (feeds `kovra intake list`
170 /// and the menu-bar badge).
171 pub fn list_pending(&self) -> Result<Vec<PendingIntake>, CoreError> {
172 let mut out = Vec::new();
173 if !self.dir.exists() {
174 return Ok(out);
175 }
176 let entries =
177 fs::read_dir(&self.dir).map_err(|e| CoreError::Io(format!("read intake dir: {e}")))?;
178 for entry in entries {
179 let path = entry
180 .map_err(|e| CoreError::Io(format!("intake entry: {e}")))?
181 .path();
182 if path.extension().and_then(|e| e.to_str()) != Some(INTAKE_EXT) {
183 continue;
184 }
185 let bytes = match fs::read(&path) {
186 Ok(b) => b,
187 Err(_) => continue, // racing cleanup — skip
188 };
189 if let Ok(intake) = serde_json::from_slice::<PendingIntake>(&bytes) {
190 out.push(intake);
191 }
192 }
193 out.sort_by(|a, b| a.id.cmp(&b.id));
194 Ok(out)
195 }
196
197 /// Read a single pending intake by id (the fulfillment surface re-reads the
198 /// authoritative record before prompting). `None` if no such intake is open.
199 pub fn get(&self, id: &str) -> Result<Option<PendingIntake>, CoreError> {
200 match fs::read(self.intake_path(id)) {
201 Ok(bytes) => serde_json::from_slice::<PendingIntake>(&bytes)
202 .map(Some)
203 .map_err(|e| CoreError::Io(format!("parse intake: {e}"))),
204 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
205 Err(e) => Err(CoreError::Io(format!("read intake: {e}"))),
206 }
207 }
208
209 /// Remove a pending intake (after it is fulfilled, or to cancel it). Returns
210 /// `false` if no such intake was open.
211 pub fn cancel(&self, id: &str) -> Result<bool, CoreError> {
212 match fs::remove_file(self.intake_path(id)) {
213 Ok(()) => Ok(true),
214 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
215 Err(e) => Err(CoreError::Io(format!("remove intake: {e}"))),
216 }
217 }
218}
219
220/// Write `bytes` to `path` atomically (temp → rename), `0600` on Unix — mirrors
221/// the approval broker's `atomic_write`, reusing `store::restrict` (the single
222/// owner of kovra's on-disk permission policy).
223fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), CoreError> {
224 let tmp = path.with_extension("tmp");
225 fs::write(&tmp, bytes).map_err(|e| CoreError::Io(format!("write intake: {e}")))?;
226 store::restrict(&tmp, 0o600)?;
227 fs::rename(&tmp, path).map_err(|e| CoreError::Io(format!("rename intake: {e}")))?;
228 Ok(())
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 fn broker(dir: &Path) -> IntakeBroker {
236 IntakeBroker::new(dir.join(INTAKE_DIR))
237 }
238
239 fn sample() -> PendingIntake {
240 PendingIntake::new("prod/db/password", Sensitivity::High, "prod", Origin::Agent)
241 .with_requesting_process("node (pid 1234)")
242 .with_description("rotating the prod DB password")
243 }
244
245 // I11/I14 — the persisted intake carries the address/metadata but NEVER a
246 // value: there is no `value` field in the model, and none appears on disk.
247 #[test]
248 fn persisted_intake_has_no_value_field() {
249 let tmp = tempfile::tempdir().unwrap();
250 let b = broker(tmp.path());
251 let stored = b.create(sample()).unwrap();
252
253 let raw = fs::read_to_string(b.intake_path(&stored.id)).unwrap();
254 let json: serde_json::Value = serde_json::from_str(&raw).unwrap();
255 let obj = json.as_object().unwrap();
256 assert!(
257 !obj.contains_key("value"),
258 "intake must never persist a value (I11/I12/I14): {raw}"
259 );
260 // The authoritative address/metadata IS present.
261 assert_eq!(obj["coordinate"], "prod/db/password");
262 assert_eq!(obj["environment"], "prod");
263 }
264
265 // I12 — the intake file is owner-only (0600) on Unix, like the approval broker.
266 #[cfg(unix)]
267 #[test]
268 fn intake_file_is_owner_only() {
269 use std::os::unix::fs::PermissionsExt;
270 let tmp = tempfile::tempdir().unwrap();
271 let b = broker(tmp.path());
272 let stored = b.create(sample()).unwrap();
273 let mode = fs::metadata(b.intake_path(&stored.id))
274 .unwrap()
275 .permissions()
276 .mode()
277 & 0o777;
278 assert_eq!(mode, 0o600, "intake file must be 0600");
279 }
280
281 // create is non-blocking and list_pending surfaces the authoritative request;
282 // the agent-supplied description is fenced as Untrusted, never authoritative.
283 #[test]
284 fn create_is_non_blocking_and_lists_authoritative_metadata() {
285 let tmp = tempfile::tempdir().unwrap();
286 let b = broker(tmp.path());
287 // create returns immediately (no Confirmer, no block) with a stamped id.
288 let stored = b.create(sample()).unwrap();
289 assert!(!stored.id.is_empty());
290
291 let pending = b.list_pending().unwrap();
292 assert_eq!(pending.len(), 1);
293 let p = &pending[0];
294 assert_eq!(p.coordinate, "prod/db/password");
295 assert_eq!(p.sensitivity, Sensitivity::High);
296 assert_eq!(p.environment, "prod");
297 assert_eq!(p.requesting_process.as_deref(), Some("node (pid 1234)"));
298 // The description is segregated as untrusted.
299 let desc = p.description.as_ref().unwrap();
300 assert!(format!("{desc}").contains("untrusted"));
301 }
302
303 // get re-reads the authoritative record; cancel removes it (the fulfillment
304 // surface cancels after sealing). cancel of an unknown id is false.
305 #[test]
306 fn get_then_cancel_roundtrips() {
307 let tmp = tempfile::tempdir().unwrap();
308 let b = broker(tmp.path());
309 let stored = b.create(sample()).unwrap();
310
311 let got = b.get(&stored.id).unwrap().unwrap();
312 assert_eq!(got, stored);
313
314 assert!(b.cancel(&stored.id).unwrap());
315 assert!(b.get(&stored.id).unwrap().is_none());
316 assert!(b.list_pending().unwrap().is_empty());
317 // cancelling again (or an unknown id) is a no-op false, not an error.
318 assert!(!b.cancel(&stored.id).unwrap());
319 assert!(!b.cancel("no-such-id").unwrap());
320 }
321
322 // Anti-phishing / human-summoned-only (structural): the broker has no API that
323 // both creates an intake AND yields a value or opens a capture surface. The
324 // only product of create() is a PendingIntake of pure metadata; the value can
325 // only arrive out-of-band at the fulfillment layer. This test pins that the
326 // created record is value-free and that completing an intake is a separate,
327 // human-driven step (cancel after an out-of-band seal), not a broker method.
328 #[test]
329 fn broker_never_couples_creation_to_value_capture() {
330 let tmp = tempfile::tempdir().unwrap();
331 let b = broker(tmp.path());
332 let stored = b.create(sample()).unwrap();
333 // The returned record is pure metadata — there is no value to leak.
334 let serialized = serde_json::to_string(&stored).unwrap();
335 assert!(!serialized.contains("value"));
336 // Nothing was sealed by create(): the intake is still pending until a
337 // human fulfills it out-of-band.
338 assert_eq!(b.list_pending().unwrap().len(), 1);
339 }
340
341 // list_pending tolerates a non-existent dir and non-json clutter.
342 #[test]
343 fn list_pending_is_empty_when_no_dir() {
344 let tmp = tempfile::tempdir().unwrap();
345 let b = broker(tmp.path());
346 assert!(b.list_pending().unwrap().is_empty());
347 }
348}