Skip to main content

kanade_shared/wire/
agent_config.rs

1//! Layered fleet configuration that lives in the `agent_config` KV
2//! bucket (Sprint 6).
3//!
4//! Three scopes flow into the agent's effective config, in order of
5//! increasing specificity:
6//!
7//! ```text
8//! built-in default        (compiled in; floor when nothing else is set)
9//!   ↓
10//! agent_config:global     (whole-fleet default)
11//!   ↓
12//! agent_config:groups.<g> (per-group override; one or more apply)
13//!   ↓
14//! agent_config:pcs.<pc>   (per-PC override; final word)
15//! ```
16//!
17//! The wire type for every scope is the same — [`ConfigScope`], a
18//! struct of `Option<T>` fields. `Some` means "this scope sets this
19//! field"; `None` means "fall through to the next layer". JSON
20//! `null` is the same as the field being absent thanks to serde's
21//! struct-level `default`.
22//!
23//! [`resolve`] is the pure functional core that flattens the scope
24//! stack into an [`EffectiveConfig`] (concrete values, no Options).
25//! When the same field is set on more than one group the PC belongs
26//! to, alphabetical group order wins last (CSS-cascade style) and a
27//! [`ResolutionWarning::MultiGroupConflict`] is emitted so the
28//! caller can log it — pre-empts the "why does this PC have value X?
29//! none of my groups say X" debugging session.
30//!
31//! v0.20.0: `inventory_interval` / `inventory_jitter` /
32//! `inventory_enabled` removed. They were leftovers from the
33//! v0.14-retired hardcoded WMI inventory loop; runtime inventory
34//! now lives in operator-defined probe jobs (`configs/jobs/
35//! inventory-*.yaml`), so the layered config no longer carries
36//! anything about it.
37
38use std::collections::BTreeMap;
39use std::time::Duration;
40
41use serde::{Deserialize, Serialize};
42
43/// Per-scope partial config. Every field is `Option<T>`: `Some` =
44/// set, `None` = inherit from the next-less-specific scope. Serde
45/// `default` + `skip_serializing_if` keeps the wire JSON tight —
46/// unset fields don't appear in the bucket value.
47#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
48#[serde(default)]
49pub struct ConfigScope {
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub target_version: Option<String>,
52    /// Random sleep window applied at each agent before it starts
53    /// downloading a new target_version, so a fleet-wide rollout
54    /// doesn't slam the Object Store / broker all at once
55    /// (humantime, e.g. `"30m"`). `"0s"` = no jitter (explicit
56    /// opt-in for canary / single-PC deploys); unset falls back to
57    /// the safe built-in default (10m — #491).
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub target_version_jitter: Option<String>,
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub heartbeat_interval: Option<String>,
62    /// Cadence for the whole-host perf snapshot loop (`host_perf.<pc_id>`).
63    /// Separate from `heartbeat_interval` because the host-wide
64    /// sysinfo refresh is slightly heavier than the per-process self-
65    /// perf one (memory + disk + network counters in addition to CPU)
66    /// and gappier data is acceptable for graphing. Default 60 s.
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub host_perf_interval: Option<String>,
69    /// v0.41 / Phase 2: operator-driven opt-in for the heavy per-
70    /// process snapshot loop (`process_perf.<pc_id>`). Default off
71    /// because walking the full process table is the most expensive
72    /// sysinfo call on Citrix / RDS hosts; flip on only when an
73    /// operator is actively investigating a host. Paired with
74    /// `process_perf_expires_at` to auto-disable after a window —
75    /// see [`EffectiveConfig::process_perf_active_at`].
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub process_perf_enabled: Option<bool>,
78    /// Wall-clock RFC3339 timestamp after which `process_perf_enabled`
79    /// is considered expired and the agent stops publishing process
80    /// snapshots — even if the flag itself is still `true`. Lets the
81    /// SPA toggle "ON for 30 m" without the operator having to come
82    /// back and clear the flag manually. `None` (or the past) +
83    /// enabled=true means "indefinitely on" (rare; mostly a test path).
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
86    /// Top-N processes (ordered by CPU%) the agent publishes per tick.
87    /// 20 by default — enough to cover the usual suspects on a
88    /// constrained host without ballooning the projector row volume
89    /// when several PCs are simultaneously in investigation mode.
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub process_perf_top_n: Option<u32>,
92}
93
94impl ConfigScope {
95    pub fn is_empty(&self) -> bool {
96        self.target_version.is_none()
97            && self.target_version_jitter.is_none()
98            && self.heartbeat_interval.is_none()
99            && self.host_perf_interval.is_none()
100            && self.process_perf_enabled.is_none()
101            && self.process_perf_expires_at.is_none()
102            && self.process_perf_top_n.is_none()
103    }
104}
105
106/// Concrete config the agent runs against once the scope stack has
107/// been flattened. `target_version` stays `Option` because "no
108/// rollout target set anywhere" is a meaningful state (the agent
109/// just keeps running the version it has); the other fields always
110/// have a value, falling back to [`EffectiveConfig::builtin_defaults`]
111/// when no scope sets them.
112#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
113pub struct EffectiveConfig {
114    pub target_version: Option<String>,
115    pub target_version_jitter: String,
116    pub heartbeat_interval: String,
117    pub host_perf_interval: String,
118    /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_enabled`].
119    pub process_perf_enabled: bool,
120    /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_expires_at`].
121    pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
122    /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_top_n`].
123    pub process_perf_top_n: u32,
124}
125
126impl EffectiveConfig {
127    /// Floor values used when no KV scope sets a given field.
128    pub fn builtin_defaults() -> Self {
129        Self {
130            target_version: None,
131            // #491: safe-by-default. The pre-Sprint-11 "0s" default
132            // meant a fleet-wide target_version flip made every
133            // agent pull the multi-MB binary from the Object Store
134            // at the same instant (3,000 hosts ≈ tens of GB through
135            // one broker NIC) unless the operator remembered
136            // `--jitter` on every rollout. 10m amortises a
137            // 3,000-host fleet to ~5 downloads/s while staying
138            // tolerable for mid-size rollouts. Canary / dev flows
139            // that want the immediate swap opt in explicitly with
140            // `--jitter 0s` (fleet-deploy.ps1 does this for
141            // single-PC deploys).
142            target_version_jitter: "10m".to_string(),
143            heartbeat_interval: "30s".to_string(),
144            // 60 s default: 2× the heartbeat cadence so the chart has
145            // a roughly aligned point every other heartbeat, while
146            // keeping the host-wide sysinfo refresh (which on Citrix /
147            // RDS hosts is the heaviest call we make) out of the
148            // tight 30 s loop.
149            host_perf_interval: "60s".to_string(),
150            // Off by default. Per-process collection walks the full
151            // OS process table — the most expensive sysinfo call —
152            // so the fleet pays nothing until an operator opts a
153            // specific host into "investigation mode".
154            process_perf_enabled: false,
155            process_perf_expires_at: None,
156            process_perf_top_n: 20,
157        }
158    }
159
160    /// Returns true when process-perf collection should actually run
161    /// **right now**: the flag is set AND no expiry has passed.
162    /// Centralised here so agent / backend / SPA all agree on the
163    /// active-vs-expired distinction.
164    pub fn process_perf_active_at(&self, now: chrono::DateTime<chrono::Utc>) -> bool {
165        if !self.process_perf_enabled {
166            return false;
167        }
168        match self.process_perf_expires_at {
169            None => true,
170            Some(deadline) => now < deadline,
171        }
172    }
173
174    /// Parsed `heartbeat_interval`, falling back to the built-in
175    /// 30 s default on a malformed string. Logging the parse error
176    /// is the caller's job (so that test code can stay quiet).
177    pub fn heartbeat_duration(&self) -> Duration {
178        humantime::parse_duration(&self.heartbeat_interval).unwrap_or(Duration::from_secs(30))
179    }
180
181    /// Parsed `host_perf_interval`, falling back to the built-in
182    /// 60 s default on a malformed string.
183    pub fn host_perf_duration(&self) -> Duration {
184        humantime::parse_duration(&self.host_perf_interval).unwrap_or(Duration::from_secs(60))
185    }
186
187    /// Parsed `target_version_jitter`. #491: a malformed string
188    /// falls back to the safe built-in default (10 m), not zero —
189    /// the old ZERO fallback silently turned a `--jitter 30minutes`
190    /// typo into the exact fleet-wide download herd the flag exists
191    /// to prevent. The write boundaries (CLI `config set` /
192    /// `agent rollout`, backend rollout API) now reject malformed
193    /// strings outright, so this fallback only covers values that
194    /// predate that validation.
195    pub fn target_version_jitter_duration(&self) -> Duration {
196        humantime::parse_duration(&self.target_version_jitter)
197            .unwrap_or(Duration::from_secs(10 * 60))
198    }
199}
200
201impl Default for EffectiveConfig {
202    fn default() -> Self {
203        Self::builtin_defaults()
204    }
205}
206
207/// Non-fatal observations from [`resolve`] that the caller should
208/// log. Currently only "two of this PC's groups set the same field
209/// to different values" — useful pre-emptive debugging signal when
210/// canary / wave / dept overlays accidentally overlap.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum ResolutionWarning {
213    MultiGroupConflict {
214        field: &'static str,
215        /// Group names that set this field, in alphabetical order
216        /// (i.e. the application order — the last name in this list
217        /// is the one whose value actually won).
218        groups: Vec<String>,
219    },
220}
221
222/// Flatten the scope stack into an [`EffectiveConfig`].
223///
224/// * `global` — the `global` key in the `agent_config` bucket
225///   (`None` if no row yet).
226/// * `group_scopes` — every `groups.<name>` row currently in the
227///   bucket (the caller can pass all of them; only the ones whose
228///   name is in `my_groups` are applied).
229/// * `pc_scope` — the `pcs.<pc_id>` row for this agent (`None` if
230///   no row yet).
231/// * `my_groups` — this agent's current memberships (from the
232///   `agent_groups` bucket).
233///
234/// Order of application: built-in default → global → per-group
235/// (alphabetical, last wins) → per-pc. Multi-group conflicts (≥ 2
236/// of `my_groups` setting the same field) are returned as warnings
237/// alongside the resolved config.
238pub fn resolve(
239    global: Option<&ConfigScope>,
240    group_scopes: &BTreeMap<String, ConfigScope>,
241    pc_scope: Option<&ConfigScope>,
242    my_groups: &[String],
243) -> (EffectiveConfig, Vec<ResolutionWarning>) {
244    let mut out = EffectiveConfig::builtin_defaults();
245    let mut warnings = Vec::new();
246
247    if let Some(g) = global {
248        apply_scope(&mut out, g);
249    }
250
251    // Sort + dedup the group list so iteration order is deterministic
252    // and "last wins" is well-defined.
253    let mut sorted_groups: Vec<&str> = my_groups.iter().map(String::as_str).collect();
254    sorted_groups.sort();
255    sorted_groups.dedup();
256
257    // Pass 1: find multi-setter fields so the caller can warn before
258    // pass 2 silently lets the alphabetical-last value win.
259    let mut setters: BTreeMap<&'static str, Vec<String>> = BTreeMap::new();
260    for g in &sorted_groups {
261        let Some(scope) = group_scopes.get(*g) else {
262            continue;
263        };
264        if scope.target_version.is_some() {
265            setters
266                .entry("target_version")
267                .or_default()
268                .push(g.to_string());
269        }
270        if scope.target_version_jitter.is_some() {
271            setters
272                .entry("target_version_jitter")
273                .or_default()
274                .push(g.to_string());
275        }
276        if scope.heartbeat_interval.is_some() {
277            setters
278                .entry("heartbeat_interval")
279                .or_default()
280                .push(g.to_string());
281        }
282        if scope.host_perf_interval.is_some() {
283            setters
284                .entry("host_perf_interval")
285                .or_default()
286                .push(g.to_string());
287        }
288        if scope.process_perf_enabled.is_some() {
289            setters
290                .entry("process_perf_enabled")
291                .or_default()
292                .push(g.to_string());
293        }
294        if scope.process_perf_expires_at.is_some() {
295            setters
296                .entry("process_perf_expires_at")
297                .or_default()
298                .push(g.to_string());
299        }
300        if scope.process_perf_top_n.is_some() {
301            setters
302                .entry("process_perf_top_n")
303                .or_default()
304                .push(g.to_string());
305        }
306    }
307    for (field, groups) in setters {
308        if groups.len() > 1 {
309            warnings.push(ResolutionWarning::MultiGroupConflict { field, groups });
310        }
311    }
312
313    // Pass 2: actually apply, alphabetically. Last-wins by construction.
314    for g in &sorted_groups {
315        if let Some(scope) = group_scopes.get(*g) {
316            apply_scope(&mut out, scope);
317        }
318    }
319
320    if let Some(p) = pc_scope {
321        apply_scope(&mut out, p);
322    }
323
324    (out, warnings)
325}
326
327fn apply_scope(out: &mut EffectiveConfig, s: &ConfigScope) {
328    if let Some(v) = &s.target_version {
329        out.target_version = Some(v.clone());
330    }
331    if let Some(v) = &s.target_version_jitter {
332        out.target_version_jitter = v.clone();
333    }
334    if let Some(v) = &s.heartbeat_interval {
335        out.heartbeat_interval = v.clone();
336    }
337    if let Some(v) = &s.host_perf_interval {
338        out.host_perf_interval = v.clone();
339    }
340    if let Some(v) = s.process_perf_enabled {
341        out.process_perf_enabled = v;
342    }
343    if let Some(v) = s.process_perf_expires_at {
344        out.process_perf_expires_at = Some(v);
345    }
346    if let Some(v) = s.process_perf_top_n {
347        out.process_perf_top_n = v;
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    fn scope() -> ConfigScope {
356        ConfigScope::default()
357    }
358
359    #[test]
360    fn empty_stack_gives_builtin_defaults() {
361        let (eff, warns) = resolve(None, &BTreeMap::new(), None, &[]);
362        assert_eq!(eff, EffectiveConfig::builtin_defaults());
363        assert!(warns.is_empty());
364    }
365
366    #[test]
367    fn global_only() {
368        let g = ConfigScope {
369            heartbeat_interval: Some("60s".into()),
370            ..scope()
371        };
372        let (eff, _) = resolve(Some(&g), &BTreeMap::new(), None, &[]);
373        assert_eq!(eff.heartbeat_interval, "60s");
374        // Unset fields stay at builtin defaults (#491: jitter's
375        // builtin default is the safe 10m, not 0s).
376        assert_eq!(eff.target_version_jitter, "10m");
377        assert!(eff.target_version.is_none());
378    }
379
380    #[test]
381    fn group_overrides_global() {
382        let global = ConfigScope {
383            heartbeat_interval: Some("30s".into()),
384            ..scope()
385        };
386        let mut groups = BTreeMap::new();
387        groups.insert(
388            "canary".into(),
389            ConfigScope {
390                heartbeat_interval: Some("5s".into()),
391                ..scope()
392            },
393        );
394        let (eff, warns) = resolve(Some(&global), &groups, None, &["canary".into()]);
395        assert_eq!(eff.heartbeat_interval, "5s");
396        assert!(warns.is_empty());
397    }
398
399    #[test]
400    fn pc_overrides_group() {
401        let mut groups = BTreeMap::new();
402        groups.insert(
403            "wave1".into(),
404            ConfigScope {
405                heartbeat_interval: Some("30s".into()),
406                ..scope()
407            },
408        );
409        let pc = ConfigScope {
410            heartbeat_interval: Some("5s".into()),
411            ..scope()
412        };
413        let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
414        assert_eq!(eff.heartbeat_interval, "5s");
415    }
416
417    #[test]
418    fn pc_overrides_global_when_no_group_match() {
419        let global = ConfigScope {
420            heartbeat_interval: Some("30s".into()),
421            ..scope()
422        };
423        let pc = ConfigScope {
424            heartbeat_interval: Some("5s".into()),
425            ..scope()
426        };
427        let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
428        assert_eq!(eff.heartbeat_interval, "5s");
429    }
430
431    #[test]
432    fn partial_override_only_changes_named_fields() {
433        let global = ConfigScope {
434            target_version_jitter: Some("30m".into()),
435            heartbeat_interval: Some("30s".into()),
436            ..scope()
437        };
438        let pc = ConfigScope {
439            heartbeat_interval: Some("15s".into()),
440            // intentionally not touching target_version_jitter
441            ..scope()
442        };
443        let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
444        assert_eq!(eff.target_version_jitter, "30m"); // from global
445        assert_eq!(eff.heartbeat_interval, "15s"); // from pc
446    }
447
448    #[test]
449    fn multi_group_conflict_emits_warning() {
450        let mut groups = BTreeMap::new();
451        groups.insert(
452            "wave1".into(),
453            ConfigScope {
454                heartbeat_interval: Some("5s".into()),
455                ..scope()
456            },
457        );
458        groups.insert(
459            "dept-eng".into(),
460            ConfigScope {
461                heartbeat_interval: Some("60s".into()),
462                ..scope()
463            },
464        );
465        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
466        // "dept-eng" sorts before "wave1", so wave1 wins (last alphabetical).
467        assert_eq!(eff.heartbeat_interval, "5s");
468        assert_eq!(warns.len(), 1);
469        match &warns[0] {
470            ResolutionWarning::MultiGroupConflict { field, groups } => {
471                assert_eq!(*field, "heartbeat_interval");
472                assert_eq!(groups, &vec!["dept-eng".to_string(), "wave1".to_string()]);
473            }
474        }
475    }
476
477    #[test]
478    fn group_alphabetical_last_wins_no_conflict_when_only_one_sets() {
479        let mut groups = BTreeMap::new();
480        groups.insert(
481            "wave1".into(),
482            ConfigScope {
483                heartbeat_interval: Some("5s".into()),
484                ..scope()
485            },
486        );
487        groups.insert(
488            "dept-eng".into(),
489            ConfigScope {
490                // Different field — doesn't conflict.
491                target_version_jitter: Some("15m".into()),
492                ..scope()
493            },
494        );
495        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
496        assert_eq!(eff.heartbeat_interval, "5s");
497        assert_eq!(eff.target_version_jitter, "15m");
498        assert!(warns.is_empty());
499    }
500
501    #[test]
502    fn unknown_group_is_silently_ignored() {
503        // my_groups names a group that has no scope row yet. Common
504        // on the first agent that joins a freshly-named group; the
505        // resolver should treat it as a no-op, not an error.
506        let mut groups = BTreeMap::new();
507        groups.insert(
508            "canary".into(),
509            ConfigScope {
510                heartbeat_interval: Some("5s".into()),
511                ..scope()
512            },
513        );
514        let (eff, warns) = resolve(
515            None,
516            &groups,
517            None,
518            &["canary".into(), "ghost-group".into()],
519        );
520        assert_eq!(eff.heartbeat_interval, "5s");
521        assert!(warns.is_empty());
522    }
523
524    #[test]
525    fn group_scope_not_applied_when_pc_not_in_group() {
526        let mut groups = BTreeMap::new();
527        groups.insert(
528            "canary".into(),
529            ConfigScope {
530                target_version: Some("0.3.0".into()),
531                ..scope()
532            },
533        );
534        let (eff, _) = resolve(None, &groups, None, &["dept-eng".into()]);
535        // PC is NOT in canary, so the rollout target shouldn't apply.
536        assert!(eff.target_version.is_none());
537    }
538
539    #[test]
540    fn duplicate_group_names_dedup_silently() {
541        let mut groups = BTreeMap::new();
542        groups.insert(
543            "wave1".into(),
544            ConfigScope {
545                heartbeat_interval: Some("5s".into()),
546                ..scope()
547            },
548        );
549        // my_groups carries the same name twice — the dedup pass
550        // keeps it from looking like a conflict-with-self.
551        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "wave1".into()]);
552        assert_eq!(eff.heartbeat_interval, "5s");
553        assert!(warns.is_empty());
554    }
555
556    #[test]
557    fn config_scope_serde_round_trip() {
558        let s = ConfigScope {
559            target_version: Some("0.3.0".into()),
560            heartbeat_interval: Some("15s".into()),
561            ..scope()
562        };
563        let json = serde_json::to_string(&s).unwrap();
564        // Only set fields appear in JSON.
565        assert_eq!(
566            json,
567            r#"{"target_version":"0.3.0","heartbeat_interval":"15s"}"#
568        );
569        let back: ConfigScope = serde_json::from_str(&json).unwrap();
570        assert_eq!(back, s);
571    }
572
573    #[test]
574    fn empty_config_scope_round_trips_as_empty_json() {
575        let s = ConfigScope::default();
576        assert!(s.is_empty());
577        let json = serde_json::to_string(&s).unwrap();
578        assert_eq!(json, "{}");
579        let back: ConfigScope = serde_json::from_str(&json).unwrap();
580        assert_eq!(back, s);
581    }
582
583    #[test]
584    fn deserialize_tolerates_unknown_fields_for_forward_compat() {
585        // Older agent / backend builds should keep parsing in case
586        // we add fields later. v0.20 also relies on this so pre-v0.20
587        // rows that still have inventory_interval / inventory_jitter
588        // / inventory_enabled in the bucket value parse OK as the
589        // new (smaller) ConfigScope — the dropped fields just
590        // dissolve into "unknown, ignored".
591        let json =
592            r#"{"target_version":"0.3.0","inventory_interval":"24h","future_knob":"future_value"}"#;
593        let s: ConfigScope = serde_json::from_str(json).unwrap();
594        assert_eq!(s.target_version.as_deref(), Some("0.3.0"));
595    }
596
597    #[test]
598    fn pc_does_not_override_other_pcs() {
599        // Sanity: pc_scope passed in is by definition the row for THIS
600        // pc; the caller is responsible for picking the right one.
601        // This test guards against a future refactor that accidentally
602        // wires in the wrong scope by ensuring the apply happens last
603        // (after groups), so the PC value is the visible one.
604        let mut groups = BTreeMap::new();
605        groups.insert(
606            "wave1".into(),
607            ConfigScope {
608                heartbeat_interval: Some("30s".into()),
609                ..scope()
610            },
611        );
612        let pc = ConfigScope {
613            heartbeat_interval: Some("5s".into()),
614            ..scope()
615        };
616        let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
617        assert_eq!(eff.heartbeat_interval, "5s");
618    }
619
620    #[test]
621    fn malformed_jitter_falls_back_to_safe_default_not_zero() {
622        // #491: pre-fix this fell back to ZERO, silently turning a
623        // typo'd jitter into a fleet-wide simultaneous download.
624        // (Note "30minutes" is VALID humantime — full unit names
625        // parse — so the malformed sample must be genuinely broken.)
626        let eff = EffectiveConfig {
627            target_version_jitter: "not-a-duration".into(),
628            ..EffectiveConfig::builtin_defaults()
629        };
630        assert_eq!(
631            eff.target_version_jitter_duration(),
632            Duration::from_secs(10 * 60),
633        );
634        // Explicit 0s remains an honoured opt-in.
635        let zero = EffectiveConfig {
636            target_version_jitter: "0s".into(),
637            ..EffectiveConfig::builtin_defaults()
638        };
639        assert_eq!(zero.target_version_jitter_duration(), Duration::ZERO);
640    }
641}