Skip to main content

kanade_shared/wire/
agent_config.rs

1//! Layered fleet configuration that lives in the `agent_config` KV
2//! bucket (Sprint 6).
3//!
4//! Three scopes flow into the agent's effective config, in order of
5//! increasing specificity:
6//!
7//! ```text
8//! built-in default        (compiled in; floor when nothing else is set)
9//!   ↓
10//! agent_config:global     (whole-fleet default)
11//!   ↓
12//! agent_config:groups.<g> (per-group override; one or more apply)
13//!   ↓
14//! agent_config:pcs.<pc>   (per-PC override; final word)
15//! ```
16//!
17//! The wire type for every scope is the same — [`ConfigScope`], a
18//! struct of `Option<T>` fields. `Some` means "this scope sets this
19//! field"; `None` means "fall through to the next layer". JSON
20//! `null` is the same as the field being absent thanks to serde's
21//! struct-level `default`.
22//!
23//! [`resolve`] is the pure functional core that flattens the scope
24//! stack into an [`EffectiveConfig`] (concrete values, no Options).
25//! When the same field is set on more than one group the PC belongs
26//! to, alphabetical group order wins last (CSS-cascade style) and a
27//! [`ResolutionWarning::MultiGroupConflict`] is emitted so the
28//! caller can log it — pre-empts the "why does this PC have value X?
29//! none of my groups say X" debugging session.
30//!
31//! v0.20.0: `inventory_interval` / `inventory_jitter` /
32//! `inventory_enabled` removed. They were leftovers from the
33//! v0.14-retired hardcoded WMI inventory loop; runtime inventory
34//! now lives in operator-defined probe jobs (`configs/jobs/
35//! inventory-*.yaml`), so the layered config no longer carries
36//! anything about it.
37
38use std::collections::BTreeMap;
39use std::time::Duration;
40
41use serde::{Deserialize, Serialize};
42
43/// Per-scope partial config. Every field is `Option<T>`: `Some` =
44/// set, `None` = inherit from the next-less-specific scope. Serde
45/// `default` + `skip_serializing_if` keeps the wire JSON tight —
46/// unset fields don't appear in the bucket value.
47#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
48#[serde(default)]
49pub struct ConfigScope {
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub target_version: Option<String>,
52    /// Random sleep window applied at each agent before it starts
53    /// downloading a new target_version, so a fleet-wide rollout
54    /// doesn't slam the Object Store / broker all at once
55    /// (humantime, e.g. `"30m"`). `"0s"` (or unset) = no jitter.
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub target_version_jitter: Option<String>,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub heartbeat_interval: Option<String>,
60    /// Cadence for the whole-host perf snapshot loop (`host_perf.<pc_id>`).
61    /// Separate from `heartbeat_interval` because the host-wide
62    /// sysinfo refresh is slightly heavier than the per-process self-
63    /// perf one (memory + disk + network counters in addition to CPU)
64    /// and gappier data is acceptable for graphing. Default 60 s.
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub host_perf_interval: Option<String>,
67    /// v0.41 / Phase 2: operator-driven opt-in for the heavy per-
68    /// process snapshot loop (`process_perf.<pc_id>`). Default off
69    /// because walking the full process table is the most expensive
70    /// sysinfo call on Citrix / RDS hosts; flip on only when an
71    /// operator is actively investigating a host. Paired with
72    /// `process_perf_expires_at` to auto-disable after a window —
73    /// see [`EffectiveConfig::process_perf_active_at`].
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub process_perf_enabled: Option<bool>,
76    /// Wall-clock RFC3339 timestamp after which `process_perf_enabled`
77    /// is considered expired and the agent stops publishing process
78    /// snapshots — even if the flag itself is still `true`. Lets the
79    /// SPA toggle "ON for 30 m" without the operator having to come
80    /// back and clear the flag manually. `None` (or the past) +
81    /// enabled=true means "indefinitely on" (rare; mostly a test path).
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
84    /// Top-N processes (ordered by CPU%) the agent publishes per tick.
85    /// 20 by default — enough to cover the usual suspects on a
86    /// constrained host without ballooning the projector row volume
87    /// when several PCs are simultaneously in investigation mode.
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub process_perf_top_n: Option<u32>,
90}
91
92impl ConfigScope {
93    pub fn is_empty(&self) -> bool {
94        self.target_version.is_none()
95            && self.target_version_jitter.is_none()
96            && self.heartbeat_interval.is_none()
97            && self.host_perf_interval.is_none()
98            && self.process_perf_enabled.is_none()
99            && self.process_perf_expires_at.is_none()
100            && self.process_perf_top_n.is_none()
101    }
102}
103
104/// Concrete config the agent runs against once the scope stack has
105/// been flattened. `target_version` stays `Option` because "no
106/// rollout target set anywhere" is a meaningful state (the agent
107/// just keeps running the version it has); the other fields always
108/// have a value, falling back to [`EffectiveConfig::builtin_defaults`]
109/// when no scope sets them.
110#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
111pub struct EffectiveConfig {
112    pub target_version: Option<String>,
113    pub target_version_jitter: String,
114    pub heartbeat_interval: String,
115    pub host_perf_interval: String,
116    /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_enabled`].
117    pub process_perf_enabled: bool,
118    /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_expires_at`].
119    pub process_perf_expires_at: Option<chrono::DateTime<chrono::Utc>>,
120    /// v0.41 / Phase 2 — see [`ConfigScope::process_perf_top_n`].
121    pub process_perf_top_n: u32,
122}
123
124impl EffectiveConfig {
125    /// Floor values used when no KV scope sets a given field.
126    pub fn builtin_defaults() -> Self {
127        Self {
128            target_version: None,
129            // 0s = "no jitter" = pre-Sprint-11 behaviour. Operators
130            // running ≥ 100-host fleets are expected to bump this
131            // (via `kanade agent rollout … --jitter 30m` or
132            // `kanade config set target_version_jitter=30m`) so the
133            // Object Store fan-out doesn't synchronise. See issue
134            // #26 for the broader "safe-by-default" debate.
135            target_version_jitter: "0s".to_string(),
136            heartbeat_interval: "30s".to_string(),
137            // 60 s default: 2× the heartbeat cadence so the chart has
138            // a roughly aligned point every other heartbeat, while
139            // keeping the host-wide sysinfo refresh (which on Citrix /
140            // RDS hosts is the heaviest call we make) out of the
141            // tight 30 s loop.
142            host_perf_interval: "60s".to_string(),
143            // Off by default. Per-process collection walks the full
144            // OS process table — the most expensive sysinfo call —
145            // so the fleet pays nothing until an operator opts a
146            // specific host into "investigation mode".
147            process_perf_enabled: false,
148            process_perf_expires_at: None,
149            process_perf_top_n: 20,
150        }
151    }
152
153    /// Returns true when process-perf collection should actually run
154    /// **right now**: the flag is set AND no expiry has passed.
155    /// Centralised here so agent / backend / SPA all agree on the
156    /// active-vs-expired distinction.
157    pub fn process_perf_active_at(&self, now: chrono::DateTime<chrono::Utc>) -> bool {
158        if !self.process_perf_enabled {
159            return false;
160        }
161        match self.process_perf_expires_at {
162            None => true,
163            Some(deadline) => now < deadline,
164        }
165    }
166
167    /// Parsed `heartbeat_interval`, falling back to the built-in
168    /// 30 s default on a malformed string. Logging the parse error
169    /// is the caller's job (so that test code can stay quiet).
170    pub fn heartbeat_duration(&self) -> Duration {
171        humantime::parse_duration(&self.heartbeat_interval).unwrap_or(Duration::from_secs(30))
172    }
173
174    /// Parsed `host_perf_interval`, falling back to the built-in
175    /// 60 s default on a malformed string.
176    pub fn host_perf_duration(&self) -> Duration {
177        humantime::parse_duration(&self.host_perf_interval).unwrap_or(Duration::from_secs(60))
178    }
179
180    /// Parsed `target_version_jitter`, falling back to zero (= no
181    /// jitter) on a malformed string. Zero means "start downloading
182    /// immediately when target_version drifts" — fine for small
183    /// fleets / canary smoke tests, bad for 3000 hosts.
184    pub fn target_version_jitter_duration(&self) -> Duration {
185        humantime::parse_duration(&self.target_version_jitter).unwrap_or(Duration::ZERO)
186    }
187}
188
189impl Default for EffectiveConfig {
190    fn default() -> Self {
191        Self::builtin_defaults()
192    }
193}
194
195/// Non-fatal observations from [`resolve`] that the caller should
196/// log. Currently only "two of this PC's groups set the same field
197/// to different values" — useful pre-emptive debugging signal when
198/// canary / wave / dept overlays accidentally overlap.
199#[derive(Debug, Clone, PartialEq, Eq)]
200pub enum ResolutionWarning {
201    MultiGroupConflict {
202        field: &'static str,
203        /// Group names that set this field, in alphabetical order
204        /// (i.e. the application order — the last name in this list
205        /// is the one whose value actually won).
206        groups: Vec<String>,
207    },
208}
209
210/// Flatten the scope stack into an [`EffectiveConfig`].
211///
212/// * `global` — the `global` key in the `agent_config` bucket
213///   (`None` if no row yet).
214/// * `group_scopes` — every `groups.<name>` row currently in the
215///   bucket (the caller can pass all of them; only the ones whose
216///   name is in `my_groups` are applied).
217/// * `pc_scope` — the `pcs.<pc_id>` row for this agent (`None` if
218///   no row yet).
219/// * `my_groups` — this agent's current memberships (from the
220///   `agent_groups` bucket).
221///
222/// Order of application: built-in default → global → per-group
223/// (alphabetical, last wins) → per-pc. Multi-group conflicts (≥ 2
224/// of `my_groups` setting the same field) are returned as warnings
225/// alongside the resolved config.
226pub fn resolve(
227    global: Option<&ConfigScope>,
228    group_scopes: &BTreeMap<String, ConfigScope>,
229    pc_scope: Option<&ConfigScope>,
230    my_groups: &[String],
231) -> (EffectiveConfig, Vec<ResolutionWarning>) {
232    let mut out = EffectiveConfig::builtin_defaults();
233    let mut warnings = Vec::new();
234
235    if let Some(g) = global {
236        apply_scope(&mut out, g);
237    }
238
239    // Sort + dedup the group list so iteration order is deterministic
240    // and "last wins" is well-defined.
241    let mut sorted_groups: Vec<&str> = my_groups.iter().map(String::as_str).collect();
242    sorted_groups.sort();
243    sorted_groups.dedup();
244
245    // Pass 1: find multi-setter fields so the caller can warn before
246    // pass 2 silently lets the alphabetical-last value win.
247    let mut setters: BTreeMap<&'static str, Vec<String>> = BTreeMap::new();
248    for g in &sorted_groups {
249        let Some(scope) = group_scopes.get(*g) else {
250            continue;
251        };
252        if scope.target_version.is_some() {
253            setters
254                .entry("target_version")
255                .or_default()
256                .push(g.to_string());
257        }
258        if scope.target_version_jitter.is_some() {
259            setters
260                .entry("target_version_jitter")
261                .or_default()
262                .push(g.to_string());
263        }
264        if scope.heartbeat_interval.is_some() {
265            setters
266                .entry("heartbeat_interval")
267                .or_default()
268                .push(g.to_string());
269        }
270        if scope.host_perf_interval.is_some() {
271            setters
272                .entry("host_perf_interval")
273                .or_default()
274                .push(g.to_string());
275        }
276        if scope.process_perf_enabled.is_some() {
277            setters
278                .entry("process_perf_enabled")
279                .or_default()
280                .push(g.to_string());
281        }
282        if scope.process_perf_expires_at.is_some() {
283            setters
284                .entry("process_perf_expires_at")
285                .or_default()
286                .push(g.to_string());
287        }
288        if scope.process_perf_top_n.is_some() {
289            setters
290                .entry("process_perf_top_n")
291                .or_default()
292                .push(g.to_string());
293        }
294    }
295    for (field, groups) in setters {
296        if groups.len() > 1 {
297            warnings.push(ResolutionWarning::MultiGroupConflict { field, groups });
298        }
299    }
300
301    // Pass 2: actually apply, alphabetically. Last-wins by construction.
302    for g in &sorted_groups {
303        if let Some(scope) = group_scopes.get(*g) {
304            apply_scope(&mut out, scope);
305        }
306    }
307
308    if let Some(p) = pc_scope {
309        apply_scope(&mut out, p);
310    }
311
312    (out, warnings)
313}
314
315fn apply_scope(out: &mut EffectiveConfig, s: &ConfigScope) {
316    if let Some(v) = &s.target_version {
317        out.target_version = Some(v.clone());
318    }
319    if let Some(v) = &s.target_version_jitter {
320        out.target_version_jitter = v.clone();
321    }
322    if let Some(v) = &s.heartbeat_interval {
323        out.heartbeat_interval = v.clone();
324    }
325    if let Some(v) = &s.host_perf_interval {
326        out.host_perf_interval = v.clone();
327    }
328    if let Some(v) = s.process_perf_enabled {
329        out.process_perf_enabled = v;
330    }
331    if let Some(v) = s.process_perf_expires_at {
332        out.process_perf_expires_at = Some(v);
333    }
334    if let Some(v) = s.process_perf_top_n {
335        out.process_perf_top_n = v;
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    fn scope() -> ConfigScope {
344        ConfigScope::default()
345    }
346
347    #[test]
348    fn empty_stack_gives_builtin_defaults() {
349        let (eff, warns) = resolve(None, &BTreeMap::new(), None, &[]);
350        assert_eq!(eff, EffectiveConfig::builtin_defaults());
351        assert!(warns.is_empty());
352    }
353
354    #[test]
355    fn global_only() {
356        let g = ConfigScope {
357            heartbeat_interval: Some("60s".into()),
358            ..scope()
359        };
360        let (eff, _) = resolve(Some(&g), &BTreeMap::new(), None, &[]);
361        assert_eq!(eff.heartbeat_interval, "60s");
362        // Unset fields stay at builtin defaults.
363        assert_eq!(eff.target_version_jitter, "0s");
364        assert!(eff.target_version.is_none());
365    }
366
367    #[test]
368    fn group_overrides_global() {
369        let global = ConfigScope {
370            heartbeat_interval: Some("30s".into()),
371            ..scope()
372        };
373        let mut groups = BTreeMap::new();
374        groups.insert(
375            "canary".into(),
376            ConfigScope {
377                heartbeat_interval: Some("5s".into()),
378                ..scope()
379            },
380        );
381        let (eff, warns) = resolve(Some(&global), &groups, None, &["canary".into()]);
382        assert_eq!(eff.heartbeat_interval, "5s");
383        assert!(warns.is_empty());
384    }
385
386    #[test]
387    fn pc_overrides_group() {
388        let mut groups = BTreeMap::new();
389        groups.insert(
390            "wave1".into(),
391            ConfigScope {
392                heartbeat_interval: Some("30s".into()),
393                ..scope()
394            },
395        );
396        let pc = ConfigScope {
397            heartbeat_interval: Some("5s".into()),
398            ..scope()
399        };
400        let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
401        assert_eq!(eff.heartbeat_interval, "5s");
402    }
403
404    #[test]
405    fn pc_overrides_global_when_no_group_match() {
406        let global = ConfigScope {
407            heartbeat_interval: Some("30s".into()),
408            ..scope()
409        };
410        let pc = ConfigScope {
411            heartbeat_interval: Some("5s".into()),
412            ..scope()
413        };
414        let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
415        assert_eq!(eff.heartbeat_interval, "5s");
416    }
417
418    #[test]
419    fn partial_override_only_changes_named_fields() {
420        let global = ConfigScope {
421            target_version_jitter: Some("30m".into()),
422            heartbeat_interval: Some("30s".into()),
423            ..scope()
424        };
425        let pc = ConfigScope {
426            heartbeat_interval: Some("15s".into()),
427            // intentionally not touching target_version_jitter
428            ..scope()
429        };
430        let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
431        assert_eq!(eff.target_version_jitter, "30m"); // from global
432        assert_eq!(eff.heartbeat_interval, "15s"); // from pc
433    }
434
435    #[test]
436    fn multi_group_conflict_emits_warning() {
437        let mut groups = BTreeMap::new();
438        groups.insert(
439            "wave1".into(),
440            ConfigScope {
441                heartbeat_interval: Some("5s".into()),
442                ..scope()
443            },
444        );
445        groups.insert(
446            "dept-eng".into(),
447            ConfigScope {
448                heartbeat_interval: Some("60s".into()),
449                ..scope()
450            },
451        );
452        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
453        // "dept-eng" sorts before "wave1", so wave1 wins (last alphabetical).
454        assert_eq!(eff.heartbeat_interval, "5s");
455        assert_eq!(warns.len(), 1);
456        match &warns[0] {
457            ResolutionWarning::MultiGroupConflict { field, groups } => {
458                assert_eq!(*field, "heartbeat_interval");
459                assert_eq!(groups, &vec!["dept-eng".to_string(), "wave1".to_string()]);
460            }
461        }
462    }
463
464    #[test]
465    fn group_alphabetical_last_wins_no_conflict_when_only_one_sets() {
466        let mut groups = BTreeMap::new();
467        groups.insert(
468            "wave1".into(),
469            ConfigScope {
470                heartbeat_interval: Some("5s".into()),
471                ..scope()
472            },
473        );
474        groups.insert(
475            "dept-eng".into(),
476            ConfigScope {
477                // Different field — doesn't conflict.
478                target_version_jitter: Some("15m".into()),
479                ..scope()
480            },
481        );
482        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
483        assert_eq!(eff.heartbeat_interval, "5s");
484        assert_eq!(eff.target_version_jitter, "15m");
485        assert!(warns.is_empty());
486    }
487
488    #[test]
489    fn unknown_group_is_silently_ignored() {
490        // my_groups names a group that has no scope row yet. Common
491        // on the first agent that joins a freshly-named group; the
492        // resolver should treat it as a no-op, not an error.
493        let mut groups = BTreeMap::new();
494        groups.insert(
495            "canary".into(),
496            ConfigScope {
497                heartbeat_interval: Some("5s".into()),
498                ..scope()
499            },
500        );
501        let (eff, warns) = resolve(
502            None,
503            &groups,
504            None,
505            &["canary".into(), "ghost-group".into()],
506        );
507        assert_eq!(eff.heartbeat_interval, "5s");
508        assert!(warns.is_empty());
509    }
510
511    #[test]
512    fn group_scope_not_applied_when_pc_not_in_group() {
513        let mut groups = BTreeMap::new();
514        groups.insert(
515            "canary".into(),
516            ConfigScope {
517                target_version: Some("0.3.0".into()),
518                ..scope()
519            },
520        );
521        let (eff, _) = resolve(None, &groups, None, &["dept-eng".into()]);
522        // PC is NOT in canary, so the rollout target shouldn't apply.
523        assert!(eff.target_version.is_none());
524    }
525
526    #[test]
527    fn duplicate_group_names_dedup_silently() {
528        let mut groups = BTreeMap::new();
529        groups.insert(
530            "wave1".into(),
531            ConfigScope {
532                heartbeat_interval: Some("5s".into()),
533                ..scope()
534            },
535        );
536        // my_groups carries the same name twice — the dedup pass
537        // keeps it from looking like a conflict-with-self.
538        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "wave1".into()]);
539        assert_eq!(eff.heartbeat_interval, "5s");
540        assert!(warns.is_empty());
541    }
542
543    #[test]
544    fn config_scope_serde_round_trip() {
545        let s = ConfigScope {
546            target_version: Some("0.3.0".into()),
547            heartbeat_interval: Some("15s".into()),
548            ..scope()
549        };
550        let json = serde_json::to_string(&s).unwrap();
551        // Only set fields appear in JSON.
552        assert_eq!(
553            json,
554            r#"{"target_version":"0.3.0","heartbeat_interval":"15s"}"#
555        );
556        let back: ConfigScope = serde_json::from_str(&json).unwrap();
557        assert_eq!(back, s);
558    }
559
560    #[test]
561    fn empty_config_scope_round_trips_as_empty_json() {
562        let s = ConfigScope::default();
563        assert!(s.is_empty());
564        let json = serde_json::to_string(&s).unwrap();
565        assert_eq!(json, "{}");
566        let back: ConfigScope = serde_json::from_str(&json).unwrap();
567        assert_eq!(back, s);
568    }
569
570    #[test]
571    fn deserialize_tolerates_unknown_fields_for_forward_compat() {
572        // Older agent / backend builds should keep parsing in case
573        // we add fields later. v0.20 also relies on this so pre-v0.20
574        // rows that still have inventory_interval / inventory_jitter
575        // / inventory_enabled in the bucket value parse OK as the
576        // new (smaller) ConfigScope — the dropped fields just
577        // dissolve into "unknown, ignored".
578        let json =
579            r#"{"target_version":"0.3.0","inventory_interval":"24h","future_knob":"future_value"}"#;
580        let s: ConfigScope = serde_json::from_str(json).unwrap();
581        assert_eq!(s.target_version.as_deref(), Some("0.3.0"));
582    }
583
584    #[test]
585    fn pc_does_not_override_other_pcs() {
586        // Sanity: pc_scope passed in is by definition the row for THIS
587        // pc; the caller is responsible for picking the right one.
588        // This test guards against a future refactor that accidentally
589        // wires in the wrong scope by ensuring the apply happens last
590        // (after groups), so the PC value is the visible one.
591        let mut groups = BTreeMap::new();
592        groups.insert(
593            "wave1".into(),
594            ConfigScope {
595                heartbeat_interval: Some("30s".into()),
596                ..scope()
597            },
598        );
599        let pc = ConfigScope {
600            heartbeat_interval: Some("5s".into()),
601            ..scope()
602        };
603        let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
604        assert_eq!(eff.heartbeat_interval, "5s");
605    }
606}