kanade_shared/wire/agent_config.rs
1//! Layered fleet configuration that lives in the `agent_config` KV
2//! bucket (Sprint 6).
3//!
4//! Three scopes flow into the agent's effective config, in order of
5//! increasing specificity:
6//!
7//! ```text
8//! built-in default (compiled in; floor when nothing else is set)
9//! ↓
10//! agent_config:global (whole-fleet default)
11//! ↓
12//! agent_config:groups.<g> (per-group override; one or more apply)
13//! ↓
14//! agent_config:pcs.<pc> (per-PC override; final word)
15//! ```
16//!
17//! The wire type for every scope is the same — [`ConfigScope`], a
18//! struct of `Option<T>` fields. `Some` means "this scope sets this
19//! field"; `None` means "fall through to the next layer". JSON
20//! `null` is the same as the field being absent thanks to serde's
21//! struct-level `default`.
22//!
23//! [`resolve`] is the pure functional core that flattens the scope
24//! stack into an [`EffectiveConfig`] (concrete values, no Options).
25//! When the same field is set on more than one group the PC belongs
26//! to, alphabetical group order wins last (CSS-cascade style) and a
27//! [`ResolutionWarning::MultiGroupConflict`] is emitted so the
28//! caller can log it — pre-empts the "why does this PC have value X?
29//! none of my groups say X" debugging session.
30//!
31//! v0.20.0: `inventory_interval` / `inventory_jitter` /
32//! `inventory_enabled` removed. They were leftovers from the
33//! v0.14-retired hardcoded WMI inventory loop; runtime inventory
34//! now lives in operator-defined probe jobs (`configs/jobs/
35//! inventory-*.yaml`), so the layered config no longer carries
36//! anything about it.
37
38use std::collections::BTreeMap;
39use std::time::Duration;
40
41use serde::{Deserialize, Serialize};
42
43/// Per-scope partial config. Every field is `Option<T>`: `Some` =
44/// set, `None` = inherit from the next-less-specific scope. Serde
45/// `default` + `skip_serializing_if` keeps the wire JSON tight —
46/// unset fields don't appear in the bucket value.
47#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
48#[serde(default)]
49pub struct ConfigScope {
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub target_version: Option<String>,
52 /// Random sleep window applied at each agent before it starts
53 /// downloading a new target_version, so a fleet-wide rollout
54 /// doesn't slam the Object Store / broker all at once
55 /// (humantime, e.g. `"30m"`). `"0s"` (or unset) = no jitter.
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub target_version_jitter: Option<String>,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub heartbeat_interval: Option<String>,
60 /// Cadence for the whole-host perf snapshot loop (`host_perf.<pc_id>`).
61 /// Separate from `heartbeat_interval` because the host-wide
62 /// sysinfo refresh is slightly heavier than the per-process self-
63 /// perf one (memory + disk + network counters in addition to CPU)
64 /// and gappier data is acceptable for graphing. Default 60 s.
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub host_perf_interval: Option<String>,
67 /// v0.41 / Phase 2: operator-driven opt-in for the heavy per-
68 /// process snapshot loop (`process_perf.<pc_id>`). Default off
69 /// because walking the full process table is the most expensive
70 /// sysinfo call on Citrix / RDS hosts; flip on only when an
71 /// operator is actively investigating a host. Paired with
72 /// `process_perf_expires_at` to auto-disable after a window —
73 /// see [`EffectiveConfig::process_perf_active_at`].
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub process_perf_enabled: Option<bool>,
76 /// Wall-clock RFC3339 timestamp after which `process_perf_enabled`
77 /// is considered expired and the agent stops publishing process
78 /// snapshots — even if the flag itself is still `true`. Lets the
79 /// SPA toggle "ON for 30 m" without the operator having to come
80 /// back and clear the flag manually. `None` (or the past) +
81 /// enabled=true means "indefinitely on" (rare; mostly a test path).
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
84 /// Top-N processes (ordered by CPU%) the agent publishes per tick.
85 /// 20 by default — enough to cover the usual suspects on a
86 /// constrained host without ballooning the projector row volume
87 /// when several PCs are simultaneously in investigation mode.
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub process_perf_top_n: Option<u32>,
90}
91
92impl ConfigScope {
93 pub fn is_empty(&self) -> bool {
94 self.target_version.is_none()
95 && self.target_version_jitter.is_none()
96 && self.heartbeat_interval.is_none()
97 && self.host_perf_interval.is_none()
98 && self.process_perf_enabled.is_none()
99 && self.process_perf_expires_at.is_none()
100 && self.process_perf_top_n.is_none()
101 }
102}
103
104/// Concrete config the agent runs against once the scope stack has
105/// been flattened. `target_version` stays `Option` because "no
106/// rollout target set anywhere" is a meaningful state (the agent
107/// just keeps running the version it has); the other fields always
108/// have a value, falling back to [`EffectiveConfig::builtin_defaults`]
109/// when no scope sets them.
110#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
111pub struct EffectiveConfig {
112 pub target_version: Option<String>,
113 pub target_version_jitter: String,
114 pub heartbeat_interval: String,
115 pub host_perf_interval: String,
116 /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_enabled`].
117 pub process_perf_enabled: bool,
118 /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_expires_at`].
119 pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
120 /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_top_n`].
121 pub process_perf_top_n: u32,
122}
123
124impl EffectiveConfig {
125 /// Floor values used when no KV scope sets a given field.
126 pub fn builtin_defaults() -> Self {
127 Self {
128 target_version: None,
129 // 0s = "no jitter" = pre-Sprint-11 behaviour. Operators
130 // running ≥ 100-host fleets are expected to bump this
131 // (via `kanade agent rollout … --jitter 30m` or
132 // `kanade config set target_version_jitter=30m`) so the
133 // Object Store fan-out doesn't synchronise. See issue
134 // #26 for the broader "safe-by-default" debate.
135 target_version_jitter: "0s".to_string(),
136 heartbeat_interval: "30s".to_string(),
137 // 60 s default: 2× the heartbeat cadence so the chart has
138 // a roughly aligned point every other heartbeat, while
139 // keeping the host-wide sysinfo refresh (which on Citrix /
140 // RDS hosts is the heaviest call we make) out of the
141 // tight 30 s loop.
142 host_perf_interval: "60s".to_string(),
143 // Off by default. Per-process collection walks the full
144 // OS process table — the most expensive sysinfo call —
145 // so the fleet pays nothing until an operator opts a
146 // specific host into "investigation mode".
147 process_perf_enabled: false,
148 process_perf_expires_at: None,
149 process_perf_top_n: 20,
150 }
151 }
152
153 /// Returns true when process-perf collection should actually run
154 /// **right now**: the flag is set AND no expiry has passed.
155 /// Centralised here so agent / backend / SPA all agree on the
156 /// active-vs-expired distinction.
157 pub fn process_perf_active_at(&self, now: chrono::DateTime<chrono::Utc>) -> bool {
158 if !self.process_perf_enabled {
159 return false;
160 }
161 match self.process_perf_expires_at {
162 None => true,
163 Some(deadline) => now < deadline,
164 }
165 }
166
167 /// Parsed `heartbeat_interval`, falling back to the built-in
168 /// 30 s default on a malformed string. Logging the parse error
169 /// is the caller's job (so that test code can stay quiet).
170 pub fn heartbeat_duration(&self) -> Duration {
171 humantime::parse_duration(&self.heartbeat_interval).unwrap_or(Duration::from_secs(30))
172 }
173
174 /// Parsed `host_perf_interval`, falling back to the built-in
175 /// 60 s default on a malformed string.
176 pub fn host_perf_duration(&self) -> Duration {
177 humantime::parse_duration(&self.host_perf_interval).unwrap_or(Duration::from_secs(60))
178 }
179
180 /// Parsed `target_version_jitter`, falling back to zero (= no
181 /// jitter) on a malformed string. Zero means "start downloading
182 /// immediately when target_version drifts" — fine for small
183 /// fleets / canary smoke tests, bad for 3000 hosts.
184 pub fn target_version_jitter_duration(&self) -> Duration {
185 humantime::parse_duration(&self.target_version_jitter).unwrap_or(Duration::ZERO)
186 }
187}
188
189impl Default for EffectiveConfig {
190 fn default() -> Self {
191 Self::builtin_defaults()
192 }
193}
194
195/// Non-fatal observations from [`resolve`] that the caller should
196/// log. Currently only "two of this PC's groups set the same field
197/// to different values" — useful pre-emptive debugging signal when
198/// canary / wave / dept overlays accidentally overlap.
199#[derive(Debug, Clone, PartialEq, Eq)]
200pub enum ResolutionWarning {
201 MultiGroupConflict {
202 field: &'static str,
203 /// Group names that set this field, in alphabetical order
204 /// (i.e. the application order — the last name in this list
205 /// is the one whose value actually won).
206 groups: Vec<String>,
207 },
208}
209
210/// Flatten the scope stack into an [`EffectiveConfig`].
211///
212/// * `global` — the `global` key in the `agent_config` bucket
213/// (`None` if no row yet).
214/// * `group_scopes` — every `groups.<name>` row currently in the
215/// bucket (the caller can pass all of them; only the ones whose
216/// name is in `my_groups` are applied).
217/// * `pc_scope` — the `pcs.<pc_id>` row for this agent (`None` if
218/// no row yet).
219/// * `my_groups` — this agent's current memberships (from the
220/// `agent_groups` bucket).
221///
222/// Order of application: built-in default → global → per-group
223/// (alphabetical, last wins) → per-pc. Multi-group conflicts (≥ 2
224/// of `my_groups` setting the same field) are returned as warnings
225/// alongside the resolved config.
226pub fn resolve(
227 global: Option<&ConfigScope>,
228 group_scopes: &BTreeMap<String, ConfigScope>,
229 pc_scope: Option<&ConfigScope>,
230 my_groups: &[String],
231) -> (EffectiveConfig, Vec<ResolutionWarning>) {
232 let mut out = EffectiveConfig::builtin_defaults();
233 let mut warnings = Vec::new();
234
235 if let Some(g) = global {
236 apply_scope(&mut out, g);
237 }
238
239 // Sort + dedup the group list so iteration order is deterministic
240 // and "last wins" is well-defined.
241 let mut sorted_groups: Vec<&str> = my_groups.iter().map(String::as_str).collect();
242 sorted_groups.sort();
243 sorted_groups.dedup();
244
245 // Pass 1: find multi-setter fields so the caller can warn before
246 // pass 2 silently lets the alphabetical-last value win.
247 let mut setters: BTreeMap<&'static str, Vec<String>> = BTreeMap::new();
248 for g in &sorted_groups {
249 let Some(scope) = group_scopes.get(*g) else {
250 continue;
251 };
252 if scope.target_version.is_some() {
253 setters
254 .entry("target_version")
255 .or_default()
256 .push(g.to_string());
257 }
258 if scope.target_version_jitter.is_some() {
259 setters
260 .entry("target_version_jitter")
261 .or_default()
262 .push(g.to_string());
263 }
264 if scope.heartbeat_interval.is_some() {
265 setters
266 .entry("heartbeat_interval")
267 .or_default()
268 .push(g.to_string());
269 }
270 if scope.host_perf_interval.is_some() {
271 setters
272 .entry("host_perf_interval")
273 .or_default()
274 .push(g.to_string());
275 }
276 if scope.process_perf_enabled.is_some() {
277 setters
278 .entry("process_perf_enabled")
279 .or_default()
280 .push(g.to_string());
281 }
282 if scope.process_perf_expires_at.is_some() {
283 setters
284 .entry("process_perf_expires_at")
285 .or_default()
286 .push(g.to_string());
287 }
288 if scope.process_perf_top_n.is_some() {
289 setters
290 .entry("process_perf_top_n")
291 .or_default()
292 .push(g.to_string());
293 }
294 }
295 for (field, groups) in setters {
296 if groups.len() > 1 {
297 warnings.push(ResolutionWarning::MultiGroupConflict { field, groups });
298 }
299 }
300
301 // Pass 2: actually apply, alphabetically. Last-wins by construction.
302 for g in &sorted_groups {
303 if let Some(scope) = group_scopes.get(*g) {
304 apply_scope(&mut out, scope);
305 }
306 }
307
308 if let Some(p) = pc_scope {
309 apply_scope(&mut out, p);
310 }
311
312 (out, warnings)
313}
314
315fn apply_scope(out: &mut EffectiveConfig, s: &ConfigScope) {
316 if let Some(v) = &s.target_version {
317 out.target_version = Some(v.clone());
318 }
319 if let Some(v) = &s.target_version_jitter {
320 out.target_version_jitter = v.clone();
321 }
322 if let Some(v) = &s.heartbeat_interval {
323 out.heartbeat_interval = v.clone();
324 }
325 if let Some(v) = &s.host_perf_interval {
326 out.host_perf_interval = v.clone();
327 }
328 if let Some(v) = s.process_perf_enabled {
329 out.process_perf_enabled = v;
330 }
331 if let Some(v) = s.process_perf_expires_at {
332 out.process_perf_expires_at = Some(v);
333 }
334 if let Some(v) = s.process_perf_top_n {
335 out.process_perf_top_n = v;
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 fn scope() -> ConfigScope {
344 ConfigScope::default()
345 }
346
347 #[test]
348 fn empty_stack_gives_builtin_defaults() {
349 let (eff, warns) = resolve(None, &BTreeMap::new(), None, &[]);
350 assert_eq!(eff, EffectiveConfig::builtin_defaults());
351 assert!(warns.is_empty());
352 }
353
354 #[test]
355 fn global_only() {
356 let g = ConfigScope {
357 heartbeat_interval: Some("60s".into()),
358 ..scope()
359 };
360 let (eff, _) = resolve(Some(&g), &BTreeMap::new(), None, &[]);
361 assert_eq!(eff.heartbeat_interval, "60s");
362 // Unset fields stay at builtin defaults.
363 assert_eq!(eff.target_version_jitter, "0s");
364 assert!(eff.target_version.is_none());
365 }
366
367 #[test]
368 fn group_overrides_global() {
369 let global = ConfigScope {
370 heartbeat_interval: Some("30s".into()),
371 ..scope()
372 };
373 let mut groups = BTreeMap::new();
374 groups.insert(
375 "canary".into(),
376 ConfigScope {
377 heartbeat_interval: Some("5s".into()),
378 ..scope()
379 },
380 );
381 let (eff, warns) = resolve(Some(&global), &groups, None, &["canary".into()]);
382 assert_eq!(eff.heartbeat_interval, "5s");
383 assert!(warns.is_empty());
384 }
385
386 #[test]
387 fn pc_overrides_group() {
388 let mut groups = BTreeMap::new();
389 groups.insert(
390 "wave1".into(),
391 ConfigScope {
392 heartbeat_interval: Some("30s".into()),
393 ..scope()
394 },
395 );
396 let pc = ConfigScope {
397 heartbeat_interval: Some("5s".into()),
398 ..scope()
399 };
400 let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
401 assert_eq!(eff.heartbeat_interval, "5s");
402 }
403
404 #[test]
405 fn pc_overrides_global_when_no_group_match() {
406 let global = ConfigScope {
407 heartbeat_interval: Some("30s".into()),
408 ..scope()
409 };
410 let pc = ConfigScope {
411 heartbeat_interval: Some("5s".into()),
412 ..scope()
413 };
414 let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
415 assert_eq!(eff.heartbeat_interval, "5s");
416 }
417
418 #[test]
419 fn partial_override_only_changes_named_fields() {
420 let global = ConfigScope {
421 target_version_jitter: Some("30m".into()),
422 heartbeat_interval: Some("30s".into()),
423 ..scope()
424 };
425 let pc = ConfigScope {
426 heartbeat_interval: Some("15s".into()),
427 // intentionally not touching target_version_jitter
428 ..scope()
429 };
430 let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
431 assert_eq!(eff.target_version_jitter, "30m"); // from global
432 assert_eq!(eff.heartbeat_interval, "15s"); // from pc
433 }
434
435 #[test]
436 fn multi_group_conflict_emits_warning() {
437 let mut groups = BTreeMap::new();
438 groups.insert(
439 "wave1".into(),
440 ConfigScope {
441 heartbeat_interval: Some("5s".into()),
442 ..scope()
443 },
444 );
445 groups.insert(
446 "dept-eng".into(),
447 ConfigScope {
448 heartbeat_interval: Some("60s".into()),
449 ..scope()
450 },
451 );
452 let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
453 // "dept-eng" sorts before "wave1", so wave1 wins (last alphabetical).
454 assert_eq!(eff.heartbeat_interval, "5s");
455 assert_eq!(warns.len(), 1);
456 match &warns[0] {
457 ResolutionWarning::MultiGroupConflict { field, groups } => {
458 assert_eq!(*field, "heartbeat_interval");
459 assert_eq!(groups, &vec!["dept-eng".to_string(), "wave1".to_string()]);
460 }
461 }
462 }
463
464 #[test]
465 fn group_alphabetical_last_wins_no_conflict_when_only_one_sets() {
466 let mut groups = BTreeMap::new();
467 groups.insert(
468 "wave1".into(),
469 ConfigScope {
470 heartbeat_interval: Some("5s".into()),
471 ..scope()
472 },
473 );
474 groups.insert(
475 "dept-eng".into(),
476 ConfigScope {
477 // Different field — doesn't conflict.
478 target_version_jitter: Some("15m".into()),
479 ..scope()
480 },
481 );
482 let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
483 assert_eq!(eff.heartbeat_interval, "5s");
484 assert_eq!(eff.target_version_jitter, "15m");
485 assert!(warns.is_empty());
486 }
487
488 #[test]
489 fn unknown_group_is_silently_ignored() {
490 // my_groups names a group that has no scope row yet. Common
491 // on the first agent that joins a freshly-named group; the
492 // resolver should treat it as a no-op, not an error.
493 let mut groups = BTreeMap::new();
494 groups.insert(
495 "canary".into(),
496 ConfigScope {
497 heartbeat_interval: Some("5s".into()),
498 ..scope()
499 },
500 );
501 let (eff, warns) = resolve(
502 None,
503 &groups,
504 None,
505 &["canary".into(), "ghost-group".into()],
506 );
507 assert_eq!(eff.heartbeat_interval, "5s");
508 assert!(warns.is_empty());
509 }
510
511 #[test]
512 fn group_scope_not_applied_when_pc_not_in_group() {
513 let mut groups = BTreeMap::new();
514 groups.insert(
515 "canary".into(),
516 ConfigScope {
517 target_version: Some("0.3.0".into()),
518 ..scope()
519 },
520 );
521 let (eff, _) = resolve(None, &groups, None, &["dept-eng".into()]);
522 // PC is NOT in canary, so the rollout target shouldn't apply.
523 assert!(eff.target_version.is_none());
524 }
525
526 #[test]
527 fn duplicate_group_names_dedup_silently() {
528 let mut groups = BTreeMap::new();
529 groups.insert(
530 "wave1".into(),
531 ConfigScope {
532 heartbeat_interval: Some("5s".into()),
533 ..scope()
534 },
535 );
536 // my_groups carries the same name twice — the dedup pass
537 // keeps it from looking like a conflict-with-self.
538 let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "wave1".into()]);
539 assert_eq!(eff.heartbeat_interval, "5s");
540 assert!(warns.is_empty());
541 }
542
543 #[test]
544 fn config_scope_serde_round_trip() {
545 let s = ConfigScope {
546 target_version: Some("0.3.0".into()),
547 heartbeat_interval: Some("15s".into()),
548 ..scope()
549 };
550 let json = serde_json::to_string(&s).unwrap();
551 // Only set fields appear in JSON.
552 assert_eq!(
553 json,
554 r#"{"target_version":"0.3.0","heartbeat_interval":"15s"}"#
555 );
556 let back: ConfigScope = serde_json::from_str(&json).unwrap();
557 assert_eq!(back, s);
558 }
559
560 #[test]
561 fn empty_config_scope_round_trips_as_empty_json() {
562 let s = ConfigScope::default();
563 assert!(s.is_empty());
564 let json = serde_json::to_string(&s).unwrap();
565 assert_eq!(json, "{}");
566 let back: ConfigScope = serde_json::from_str(&json).unwrap();
567 assert_eq!(back, s);
568 }
569
570 #[test]
571 fn deserialize_tolerates_unknown_fields_for_forward_compat() {
572 // Older agent / backend builds should keep parsing in case
573 // we add fields later. v0.20 also relies on this so pre-v0.20
574 // rows that still have inventory_interval / inventory_jitter
575 // / inventory_enabled in the bucket value parse OK as the
576 // new (smaller) ConfigScope — the dropped fields just
577 // dissolve into "unknown, ignored".
578 let json =
579 r#"{"target_version":"0.3.0","inventory_interval":"24h","future_knob":"future_value"}"#;
580 let s: ConfigScope = serde_json::from_str(json).unwrap();
581 assert_eq!(s.target_version.as_deref(), Some("0.3.0"));
582 }
583
584 #[test]
585 fn pc_does_not_override_other_pcs() {
586 // Sanity: pc_scope passed in is by definition the row for THIS
587 // pc; the caller is responsible for picking the right one.
588 // This test guards against a future refactor that accidentally
589 // wires in the wrong scope by ensuring the apply happens last
590 // (after groups), so the PC value is the visible one.
591 let mut groups = BTreeMap::new();
592 groups.insert(
593 "wave1".into(),
594 ConfigScope {
595 heartbeat_interval: Some("30s".into()),
596 ..scope()
597 },
598 );
599 let pc = ConfigScope {
600 heartbeat_interval: Some("5s".into()),
601 ..scope()
602 };
603 let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
604 assert_eq!(eff.heartbeat_interval, "5s");
605 }
606}