kanade_shared/wire/agent_config.rs
1//! Layered fleet configuration that lives in the `agent_config` KV
2//! bucket (Sprint 6).
3//!
4//! Three scopes flow into the agent's effective config, in order of
5//! increasing specificity:
6//!
7//! ```text
8//! built-in default (compiled in; floor when nothing else is set)
9//! ↓
10//! agent_config:global (whole-fleet default)
11//! ↓
12//! agent_config:groups.<g> (per-group override; one or more apply)
13//! ↓
14//! agent_config:pcs.<pc> (per-PC override; final word)
15//! ```
16//!
17//! The wire type for every scope is the same — [`ConfigScope`], a
18//! struct of `Option<T>` fields. `Some` means "this scope sets this
19//! field"; `None` means "fall through to the next layer". JSON
20//! `null` is the same as the field being absent thanks to serde's
21//! struct-level `default`.
22//!
23//! [`resolve`] is the pure functional core that flattens the scope
24//! stack into an [`EffectiveConfig`] (concrete values, no Options).
25//! When the same field is set on more than one group the PC belongs
26//! to, alphabetical group order wins last (CSS-cascade style) and a
27//! [`ResolutionWarning::MultiGroupConflict`] is emitted so the
28//! caller can log it — pre-empts the "why does this PC have value X?
29//! none of my groups say X" debugging session.
30//!
31//! v0.20.0: `inventory_interval` / `inventory_jitter` /
32//! `inventory_enabled` removed. They were leftovers from the
33//! v0.14-retired hardcoded WMI inventory loop; runtime inventory
34//! now lives in operator-defined probe jobs (`configs/jobs/
35//! inventory-*.yaml`), so the layered config no longer carries
36//! anything about it.
37
38use std::collections::BTreeMap;
39use std::time::Duration;
40
41use serde::{Deserialize, Serialize};
42
43/// Per-scope partial config. Every field is `Option<T>`: `Some` =
44/// set, `None` = inherit from the next-less-specific scope. Serde
45/// `default` + `skip_serializing_if` keeps the wire JSON tight —
46/// unset fields don't appear in the bucket value.
47#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
48#[serde(default)]
49pub struct ConfigScope {
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub target_version: Option<String>,
52 /// Random sleep window applied at each agent before it starts
53 /// downloading a new target_version, so a fleet-wide rollout
54 /// doesn't slam the Object Store / broker all at once
55 /// (humantime, e.g. `"30m"`). `"0s"` = no jitter (explicit
56 /// opt-in for canary / single-PC deploys); unset falls back to
57 /// the safe built-in default (10m — #491).
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub target_version_jitter: Option<String>,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub heartbeat_interval: Option<String>,
62 /// Cadence for the whole-host perf snapshot loop (`host_perf.<pc_id>`).
63 /// Separate from `heartbeat_interval` because the host-wide
64 /// sysinfo refresh is slightly heavier than the per-process self-
65 /// perf one (memory + disk + network counters in addition to CPU)
66 /// and gappier data is acceptable for graphing. Default 60 s.
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub host_perf_interval: Option<String>,
69 /// v0.41 / Phase 2: operator-driven opt-in for the heavy per-
70 /// process snapshot loop (`process_perf.<pc_id>`). Default off
71 /// because walking the full process table is the most expensive
72 /// sysinfo call on Citrix / RDS hosts; flip on only when an
73 /// operator is actively investigating a host. Paired with
74 /// `process_perf_expires_at` to auto-disable after a window —
75 /// see [`EffectiveConfig::process_perf_active_at`].
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub process_perf_enabled: Option<bool>,
78 /// Wall-clock RFC3339 timestamp after which `process_perf_enabled`
79 /// is considered expired and the agent stops publishing process
80 /// snapshots — even if the flag itself is still `true`. Lets the
81 /// SPA toggle "ON for 30 m" without the operator having to come
82 /// back and clear the flag manually. `None` (or the past) +
83 /// enabled=true means "indefinitely on" (rare; mostly a test path).
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
86 /// Top-N processes (ordered by CPU%) the agent publishes per tick.
87 /// 20 by default — enough to cover the usual suspects on a
88 /// constrained host without ballooning the projector row volume
89 /// when several PCs are simultaneously in investigation mode.
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub process_perf_top_n: Option<u32>,
92}
93
94impl ConfigScope {
95 pub fn is_empty(&self) -> bool {
96 self.target_version.is_none()
97 && self.target_version_jitter.is_none()
98 && self.heartbeat_interval.is_none()
99 && self.host_perf_interval.is_none()
100 && self.process_perf_enabled.is_none()
101 && self.process_perf_expires_at.is_none()
102 && self.process_perf_top_n.is_none()
103 }
104}
105
106/// Concrete config the agent runs against once the scope stack has
107/// been flattened. `target_version` stays `Option` because "no
108/// rollout target set anywhere" is a meaningful state (the agent
109/// just keeps running the version it has); the other fields always
110/// have a value, falling back to [`EffectiveConfig::builtin_defaults`]
111/// when no scope sets them.
112#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
113pub struct EffectiveConfig {
114 pub target_version: Option<String>,
115 pub target_version_jitter: String,
116 pub heartbeat_interval: String,
117 pub host_perf_interval: String,
118 /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_enabled`].
119 pub process_perf_enabled: bool,
120 /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_expires_at`].
121 pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
122 /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_top_n`].
123 pub process_perf_top_n: u32,
124}
125
126impl EffectiveConfig {
127 /// Floor values used when no KV scope sets a given field.
128 pub fn builtin_defaults() -> Self {
129 Self {
130 target_version: None,
131 // #491: safe-by-default. The pre-Sprint-11 "0s" default
132 // meant a fleet-wide target_version flip made every
133 // agent pull the multi-MB binary from the Object Store
134 // at the same instant (3,000 hosts ≈ tens of GB through
135 // one broker NIC) unless the operator remembered
136 // `--jitter` on every rollout. 10m amortises a
137 // 3,000-host fleet to ~5 downloads/s while staying
138 // tolerable for mid-size rollouts. Canary / dev flows
139 // that want the immediate swap opt in explicitly with
140 // `--jitter 0s` (fleet-deploy.ps1 does this for
141 // single-PC deploys).
142 target_version_jitter: "10m".to_string(),
143 heartbeat_interval: "30s".to_string(),
144 // 60 s default: 2× the heartbeat cadence so the chart has
145 // a roughly aligned point every other heartbeat, while
146 // keeping the host-wide sysinfo refresh (which on Citrix /
147 // RDS hosts is the heaviest call we make) out of the
148 // tight 30 s loop.
149 host_perf_interval: "60s".to_string(),
150 // Off by default. Per-process collection walks the full
151 // OS process table — the most expensive sysinfo call —
152 // so the fleet pays nothing until an operator opts a
153 // specific host into "investigation mode".
154 process_perf_enabled: false,
155 process_perf_expires_at: None,
156 process_perf_top_n: 20,
157 }
158 }
159
160 /// Returns true when process-perf collection should actually run
161 /// **right now**: the flag is set AND no expiry has passed.
162 /// Centralised here so agent / backend / SPA all agree on the
163 /// active-vs-expired distinction.
164 pub fn process_perf_active_at(&self, now: chrono::DateTime<chrono::Utc>) -> bool {
165 if !self.process_perf_enabled {
166 return false;
167 }
168 match self.process_perf_expires_at {
169 None => true,
170 Some(deadline) => now < deadline,
171 }
172 }
173
174 /// Parsed `heartbeat_interval`, falling back to the built-in
175 /// 30 s default on a malformed string. Logging the parse error
176 /// is the caller's job (so that test code can stay quiet).
177 pub fn heartbeat_duration(&self) -> Duration {
178 humantime::parse_duration(&self.heartbeat_interval).unwrap_or(Duration::from_secs(30))
179 }
180
181 /// Parsed `host_perf_interval`, falling back to the built-in
182 /// 60 s default on a malformed string.
183 pub fn host_perf_duration(&self) -> Duration {
184 humantime::parse_duration(&self.host_perf_interval).unwrap_or(Duration::from_secs(60))
185 }
186
187 /// Parsed `target_version_jitter`. #491: a malformed string
188 /// falls back to the safe built-in default (10 m), not zero —
189 /// the old ZERO fallback silently turned a `--jitter 30minutes`
190 /// typo into the exact fleet-wide download herd the flag exists
191 /// to prevent. The write boundaries (CLI `config set` /
192 /// `agent rollout`, backend rollout API) now reject malformed
193 /// strings outright, so this fallback only covers values that
194 /// predate that validation.
195 pub fn target_version_jitter_duration(&self) -> Duration {
196 humantime::parse_duration(&self.target_version_jitter)
197 .unwrap_or(Duration::from_secs(10 * 60))
198 }
199}
200
201impl Default for EffectiveConfig {
202 fn default() -> Self {
203 Self::builtin_defaults()
204 }
205}
206
207/// Non-fatal observations from [`resolve`] that the caller should
208/// log. Currently only "two of this PC's groups set the same field
209/// to different values" — useful pre-emptive debugging signal when
210/// canary / wave / dept overlays accidentally overlap.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum ResolutionWarning {
213 MultiGroupConflict {
214 field: &'static str,
215 /// Group names that set this field, in alphabetical order
216 /// (i.e. the application order — the last name in this list
217 /// is the one whose value actually won).
218 groups: Vec<String>,
219 },
220}
221
222/// Flatten the scope stack into an [`EffectiveConfig`].
223///
224/// * `global` — the `global` key in the `agent_config` bucket
225/// (`None` if no row yet).
226/// * `group_scopes` — every `groups.<name>` row currently in the
227/// bucket (the caller can pass all of them; only the ones whose
228/// name is in `my_groups` are applied).
229/// * `pc_scope` — the `pcs.<pc_id>` row for this agent (`None` if
230/// no row yet).
231/// * `my_groups` — this agent's current memberships (from the
232/// `agent_groups` bucket).
233///
234/// Order of application: built-in default → global → per-group
235/// (alphabetical, last wins) → per-pc. Multi-group conflicts (≥ 2
236/// of `my_groups` setting the same field) are returned as warnings
237/// alongside the resolved config.
238pub fn resolve(
239 global: Option<&ConfigScope>,
240 group_scopes: &BTreeMap<String, ConfigScope>,
241 pc_scope: Option<&ConfigScope>,
242 my_groups: &[String],
243) -> (EffectiveConfig, Vec<ResolutionWarning>) {
244 let mut out = EffectiveConfig::builtin_defaults();
245 let mut warnings = Vec::new();
246
247 if let Some(g) = global {
248 apply_scope(&mut out, g);
249 }
250
251 // Sort + dedup the group list so iteration order is deterministic
252 // and "last wins" is well-defined.
253 let mut sorted_groups: Vec<&str> = my_groups.iter().map(String::as_str).collect();
254 sorted_groups.sort();
255 sorted_groups.dedup();
256
257 // Pass 1: find multi-setter fields so the caller can warn before
258 // pass 2 silently lets the alphabetical-last value win.
259 let mut setters: BTreeMap<&'static str, Vec<String>> = BTreeMap::new();
260 for g in &sorted_groups {
261 let Some(scope) = group_scopes.get(*g) else {
262 continue;
263 };
264 if scope.target_version.is_some() {
265 setters
266 .entry("target_version")
267 .or_default()
268 .push(g.to_string());
269 }
270 if scope.target_version_jitter.is_some() {
271 setters
272 .entry("target_version_jitter")
273 .or_default()
274 .push(g.to_string());
275 }
276 if scope.heartbeat_interval.is_some() {
277 setters
278 .entry("heartbeat_interval")
279 .or_default()
280 .push(g.to_string());
281 }
282 if scope.host_perf_interval.is_some() {
283 setters
284 .entry("host_perf_interval")
285 .or_default()
286 .push(g.to_string());
287 }
288 if scope.process_perf_enabled.is_some() {
289 setters
290 .entry("process_perf_enabled")
291 .or_default()
292 .push(g.to_string());
293 }
294 if scope.process_perf_expires_at.is_some() {
295 setters
296 .entry("process_perf_expires_at")
297 .or_default()
298 .push(g.to_string());
299 }
300 if scope.process_perf_top_n.is_some() {
301 setters
302 .entry("process_perf_top_n")
303 .or_default()
304 .push(g.to_string());
305 }
306 }
307 for (field, groups) in setters {
308 if groups.len() > 1 {
309 warnings.push(ResolutionWarning::MultiGroupConflict { field, groups });
310 }
311 }
312
313 // Pass 2: actually apply, alphabetically. Last-wins by construction.
314 for g in &sorted_groups {
315 if let Some(scope) = group_scopes.get(*g) {
316 apply_scope(&mut out, scope);
317 }
318 }
319
320 if let Some(p) = pc_scope {
321 apply_scope(&mut out, p);
322 }
323
324 (out, warnings)
325}
326
327fn apply_scope(out: &mut EffectiveConfig, s: &ConfigScope) {
328 if let Some(v) = &s.target_version {
329 out.target_version = Some(v.clone());
330 }
331 if let Some(v) = &s.target_version_jitter {
332 out.target_version_jitter = v.clone();
333 }
334 if let Some(v) = &s.heartbeat_interval {
335 out.heartbeat_interval = v.clone();
336 }
337 if let Some(v) = &s.host_perf_interval {
338 out.host_perf_interval = v.clone();
339 }
340 if let Some(v) = s.process_perf_enabled {
341 out.process_perf_enabled = v;
342 }
343 if let Some(v) = s.process_perf_expires_at {
344 out.process_perf_expires_at = Some(v);
345 }
346 if let Some(v) = s.process_perf_top_n {
347 out.process_perf_top_n = v;
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 fn scope() -> ConfigScope {
356 ConfigScope::default()
357 }
358
359 #[test]
360 fn empty_stack_gives_builtin_defaults() {
361 let (eff, warns) = resolve(None, &BTreeMap::new(), None, &[]);
362 assert_eq!(eff, EffectiveConfig::builtin_defaults());
363 assert!(warns.is_empty());
364 }
365
366 #[test]
367 fn global_only() {
368 let g = ConfigScope {
369 heartbeat_interval: Some("60s".into()),
370 ..scope()
371 };
372 let (eff, _) = resolve(Some(&g), &BTreeMap::new(), None, &[]);
373 assert_eq!(eff.heartbeat_interval, "60s");
374 // Unset fields stay at builtin defaults (#491: jitter's
375 // builtin default is the safe 10m, not 0s).
376 assert_eq!(eff.target_version_jitter, "10m");
377 assert!(eff.target_version.is_none());
378 }
379
380 #[test]
381 fn group_overrides_global() {
382 let global = ConfigScope {
383 heartbeat_interval: Some("30s".into()),
384 ..scope()
385 };
386 let mut groups = BTreeMap::new();
387 groups.insert(
388 "canary".into(),
389 ConfigScope {
390 heartbeat_interval: Some("5s".into()),
391 ..scope()
392 },
393 );
394 let (eff, warns) = resolve(Some(&global), &groups, None, &["canary".into()]);
395 assert_eq!(eff.heartbeat_interval, "5s");
396 assert!(warns.is_empty());
397 }
398
399 #[test]
400 fn pc_overrides_group() {
401 let mut groups = BTreeMap::new();
402 groups.insert(
403 "wave1".into(),
404 ConfigScope {
405 heartbeat_interval: Some("30s".into()),
406 ..scope()
407 },
408 );
409 let pc = ConfigScope {
410 heartbeat_interval: Some("5s".into()),
411 ..scope()
412 };
413 let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
414 assert_eq!(eff.heartbeat_interval, "5s");
415 }
416
417 #[test]
418 fn pc_overrides_global_when_no_group_match() {
419 let global = ConfigScope {
420 heartbeat_interval: Some("30s".into()),
421 ..scope()
422 };
423 let pc = ConfigScope {
424 heartbeat_interval: Some("5s".into()),
425 ..scope()
426 };
427 let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
428 assert_eq!(eff.heartbeat_interval, "5s");
429 }
430
431 #[test]
432 fn partial_override_only_changes_named_fields() {
433 let global = ConfigScope {
434 target_version_jitter: Some("30m".into()),
435 heartbeat_interval: Some("30s".into()),
436 ..scope()
437 };
438 let pc = ConfigScope {
439 heartbeat_interval: Some("15s".into()),
440 // intentionally not touching target_version_jitter
441 ..scope()
442 };
443 let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
444 assert_eq!(eff.target_version_jitter, "30m"); // from global
445 assert_eq!(eff.heartbeat_interval, "15s"); // from pc
446 }
447
448 #[test]
449 fn multi_group_conflict_emits_warning() {
450 let mut groups = BTreeMap::new();
451 groups.insert(
452 "wave1".into(),
453 ConfigScope {
454 heartbeat_interval: Some("5s".into()),
455 ..scope()
456 },
457 );
458 groups.insert(
459 "dept-eng".into(),
460 ConfigScope {
461 heartbeat_interval: Some("60s".into()),
462 ..scope()
463 },
464 );
465 let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
466 // "dept-eng" sorts before "wave1", so wave1 wins (last alphabetical).
467 assert_eq!(eff.heartbeat_interval, "5s");
468 assert_eq!(warns.len(), 1);
469 match &warns[0] {
470 ResolutionWarning::MultiGroupConflict { field, groups } => {
471 assert_eq!(*field, "heartbeat_interval");
472 assert_eq!(groups, &vec!["dept-eng".to_string(), "wave1".to_string()]);
473 }
474 }
475 }
476
477 #[test]
478 fn group_alphabetical_last_wins_no_conflict_when_only_one_sets() {
479 let mut groups = BTreeMap::new();
480 groups.insert(
481 "wave1".into(),
482 ConfigScope {
483 heartbeat_interval: Some("5s".into()),
484 ..scope()
485 },
486 );
487 groups.insert(
488 "dept-eng".into(),
489 ConfigScope {
490 // Different field — doesn't conflict.
491 target_version_jitter: Some("15m".into()),
492 ..scope()
493 },
494 );
495 let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
496 assert_eq!(eff.heartbeat_interval, "5s");
497 assert_eq!(eff.target_version_jitter, "15m");
498 assert!(warns.is_empty());
499 }
500
501 #[test]
502 fn unknown_group_is_silently_ignored() {
503 // my_groups names a group that has no scope row yet. Common
504 // on the first agent that joins a freshly-named group; the
505 // resolver should treat it as a no-op, not an error.
506 let mut groups = BTreeMap::new();
507 groups.insert(
508 "canary".into(),
509 ConfigScope {
510 heartbeat_interval: Some("5s".into()),
511 ..scope()
512 },
513 );
514 let (eff, warns) = resolve(
515 None,
516 &groups,
517 None,
518 &["canary".into(), "ghost-group".into()],
519 );
520 assert_eq!(eff.heartbeat_interval, "5s");
521 assert!(warns.is_empty());
522 }
523
524 #[test]
525 fn group_scope_not_applied_when_pc_not_in_group() {
526 let mut groups = BTreeMap::new();
527 groups.insert(
528 "canary".into(),
529 ConfigScope {
530 target_version: Some("0.3.0".into()),
531 ..scope()
532 },
533 );
534 let (eff, _) = resolve(None, &groups, None, &["dept-eng".into()]);
535 // PC is NOT in canary, so the rollout target shouldn't apply.
536 assert!(eff.target_version.is_none());
537 }
538
539 #[test]
540 fn duplicate_group_names_dedup_silently() {
541 let mut groups = BTreeMap::new();
542 groups.insert(
543 "wave1".into(),
544 ConfigScope {
545 heartbeat_interval: Some("5s".into()),
546 ..scope()
547 },
548 );
549 // my_groups carries the same name twice — the dedup pass
550 // keeps it from looking like a conflict-with-self.
551 let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "wave1".into()]);
552 assert_eq!(eff.heartbeat_interval, "5s");
553 assert!(warns.is_empty());
554 }
555
556 #[test]
557 fn config_scope_serde_round_trip() {
558 let s = ConfigScope {
559 target_version: Some("0.3.0".into()),
560 heartbeat_interval: Some("15s".into()),
561 ..scope()
562 };
563 let json = serde_json::to_string(&s).unwrap();
564 // Only set fields appear in JSON.
565 assert_eq!(
566 json,
567 r#"{"target_version":"0.3.0","heartbeat_interval":"15s"}"#
568 );
569 let back: ConfigScope = serde_json::from_str(&json).unwrap();
570 assert_eq!(back, s);
571 }
572
573 #[test]
574 fn empty_config_scope_round_trips_as_empty_json() {
575 let s = ConfigScope::default();
576 assert!(s.is_empty());
577 let json = serde_json::to_string(&s).unwrap();
578 assert_eq!(json, "{}");
579 let back: ConfigScope = serde_json::from_str(&json).unwrap();
580 assert_eq!(back, s);
581 }
582
583 #[test]
584 fn deserialize_tolerates_unknown_fields_for_forward_compat() {
585 // Older agent / backend builds should keep parsing in case
586 // we add fields later. v0.20 also relies on this so pre-v0.20
587 // rows that still have inventory_interval / inventory_jitter
588 // / inventory_enabled in the bucket value parse OK as the
589 // new (smaller) ConfigScope — the dropped fields just
590 // dissolve into "unknown, ignored".
591 let json =
592 r#"{"target_version":"0.3.0","inventory_interval":"24h","future_knob":"future_value"}"#;
593 let s: ConfigScope = serde_json::from_str(json).unwrap();
594 assert_eq!(s.target_version.as_deref(), Some("0.3.0"));
595 }
596
597 #[test]
598 fn pc_does_not_override_other_pcs() {
599 // Sanity: pc_scope passed in is by definition the row for THIS
600 // pc; the caller is responsible for picking the right one.
601 // This test guards against a future refactor that accidentally
602 // wires in the wrong scope by ensuring the apply happens last
603 // (after groups), so the PC value is the visible one.
604 let mut groups = BTreeMap::new();
605 groups.insert(
606 "wave1".into(),
607 ConfigScope {
608 heartbeat_interval: Some("30s".into()),
609 ..scope()
610 },
611 );
612 let pc = ConfigScope {
613 heartbeat_interval: Some("5s".into()),
614 ..scope()
615 };
616 let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
617 assert_eq!(eff.heartbeat_interval, "5s");
618 }
619
620 #[test]
621 fn malformed_jitter_falls_back_to_safe_default_not_zero() {
622 // #491: pre-fix this fell back to ZERO, silently turning a
623 // typo'd jitter into a fleet-wide simultaneous download.
624 // (Note "30minutes" is VALID humantime — full unit names
625 // parse — so the malformed sample must be genuinely broken.)
626 let eff = EffectiveConfig {
627 target_version_jitter: "not-a-duration".into(),
628 ..EffectiveConfig::builtin_defaults()
629 };
630 assert_eq!(
631 eff.target_version_jitter_duration(),
632 Duration::from_secs(10 * 60),
633 );
634 // Explicit 0s remains an honoured opt-in.
635 let zero = EffectiveConfig {
636 target_version_jitter: "0s".into(),
637 ..EffectiveConfig::builtin_defaults()
638 };
639 assert_eq!(zero.target_version_jitter_duration(), Duration::ZERO);
640 }
641}