kanade-shared 0.9.0

Shared wire types, NATS subject helpers, KV constants, YAML manifest schema, and teravars-backed config loader for the kanade endpoint-management system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
//! Layered fleet configuration that lives in the `agent_config` KV
//! bucket (Sprint 6).
//!
//! Three scopes flow into the agent's effective config, in order of
//! increasing specificity:
//!
//! ```text
//! built-in default        (compiled in; floor when nothing else is set)
//!//! agent_config:global     (whole-fleet default)
//!//! agent_config:groups.<g> (per-group override; one or more apply)
//!//! agent_config:pcs.<pc>   (per-PC override; final word)
//! ```
//!
//! The wire type for every scope is the same — [`ConfigScope`], a
//! struct of `Option<T>` fields. `Some` means "this scope sets this
//! field"; `None` means "fall through to the next layer". JSON
//! `null` is the same as the field being absent thanks to serde's
//! struct-level `default`.
//!
//! [`resolve`] is the pure functional core that flattens the scope
//! stack into an [`EffectiveConfig`] (concrete values, no Options).
//! When the same field is set on more than one group the PC belongs
//! to, alphabetical group order wins last (CSS-cascade style) and a
//! [`ResolutionWarning::MultiGroupConflict`] is emitted so the
//! caller can log it — pre-empts the "why does this PC have value X?
//! none of my groups say X" debugging session.

use std::collections::BTreeMap;
use std::time::Duration;

use serde::{Deserialize, Serialize};

/// Per-scope partial config. Every field is `Option<T>`: `Some` =
/// set, `None` = inherit from the next-less-specific scope. Serde
/// `default` + `skip_serializing_if` keeps the wire JSON tight —
/// unset fields don't appear in the bucket value.
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
#[serde(default)]
pub struct ConfigScope {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub target_version: Option<String>,
    /// Random sleep window applied at each agent before it starts
    /// downloading a new target_version, so a fleet-wide rollout
    /// doesn't slam the Object Store / broker all at once
    /// (humantime, e.g. `"30m"`). `"0s"` (or unset) = no jitter.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub target_version_jitter: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub inventory_interval: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub inventory_jitter: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub inventory_enabled: Option<bool>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub heartbeat_interval: Option<String>,
}

impl ConfigScope {
    pub fn is_empty(&self) -> bool {
        self.target_version.is_none()
            && self.target_version_jitter.is_none()
            && self.inventory_interval.is_none()
            && self.inventory_jitter.is_none()
            && self.inventory_enabled.is_none()
            && self.heartbeat_interval.is_none()
    }
}

/// Concrete config the agent runs against once the scope stack has
/// been flattened. `target_version` stays `Option` because "no
/// rollout target set anywhere" is a meaningful state (the agent
/// just keeps running the version it has); the other fields always
/// have a value, falling back to [`EffectiveConfig::builtin_defaults`]
/// when no scope sets them.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct EffectiveConfig {
    pub target_version: Option<String>,
    pub target_version_jitter: String,
    pub inventory_interval: String,
    pub inventory_jitter: String,
    pub inventory_enabled: bool,
    pub heartbeat_interval: String,
}

impl EffectiveConfig {
    /// Floor values used when no KV scope sets a given field.
    /// Mirrors the historic agent.toml defaults so unbootstrapped
    /// fleets keep behaving the way they did pre-Sprint 6.
    pub fn builtin_defaults() -> Self {
        Self {
            target_version: None,
            // 0s = "no jitter" = pre-Sprint-11 behaviour. Operators
            // running ≥ 100-host fleets are expected to bump this
            // (via `kanade agent rollout … --jitter 30m` or
            // `kanade config set target_version_jitter=30m`) so the
            // Object Store fan-out doesn't synchronise.
            target_version_jitter: "0s".to_string(),
            inventory_interval: "24h".to_string(),
            inventory_jitter: "10m".to_string(),
            inventory_enabled: true,
            heartbeat_interval: "30s".to_string(),
        }
    }

    /// Parsed `heartbeat_interval`, falling back to the built-in
    /// 30 s default on a malformed string. Logging the parse error
    /// is the caller's job (so that test code can stay quiet).
    pub fn heartbeat_duration(&self) -> Duration {
        humantime::parse_duration(&self.heartbeat_interval).unwrap_or(Duration::from_secs(30))
    }

    pub fn inventory_interval_duration(&self) -> Duration {
        humantime::parse_duration(&self.inventory_interval)
            .unwrap_or(Duration::from_secs(24 * 60 * 60))
    }

    pub fn inventory_jitter_duration(&self) -> Duration {
        humantime::parse_duration(&self.inventory_jitter).unwrap_or(Duration::from_secs(600))
    }

    /// Parsed `target_version_jitter`, falling back to zero (= no
    /// jitter) on a malformed string. Zero means "start downloading
    /// immediately when target_version drifts" — fine for small
    /// fleets / canary smoke tests, bad for 3000 hosts.
    pub fn target_version_jitter_duration(&self) -> Duration {
        humantime::parse_duration(&self.target_version_jitter).unwrap_or(Duration::ZERO)
    }
}

impl Default for EffectiveConfig {
    fn default() -> Self {
        Self::builtin_defaults()
    }
}

/// Non-fatal observations from [`resolve`] that the caller should
/// log. Currently only "two of this PC's groups set the same field
/// to different values" — useful pre-emptive debugging signal when
/// canary / wave / dept overlays accidentally overlap.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ResolutionWarning {
    MultiGroupConflict {
        field: &'static str,
        /// Group names that set this field, in alphabetical order
        /// (i.e. the application order — the last name in this list
        /// is the one whose value actually won).
        groups: Vec<String>,
    },
}

/// Flatten the scope stack into an [`EffectiveConfig`].
///
/// * `global` — the `global` key in the `agent_config` bucket
///   (`None` if no row yet).
/// * `group_scopes` — every `groups.<name>` row currently in the
///   bucket (the caller can pass all of them; only the ones whose
///   name is in `my_groups` are applied).
/// * `pc_scope` — the `pcs.<pc_id>` row for this agent (`None` if
///   no row yet).
/// * `my_groups` — this agent's current memberships (from the
///   `agent_groups` bucket).
///
/// Order of application: built-in default → global → per-group
/// (alphabetical, last wins) → per-pc. Multi-group conflicts (≥ 2
/// of `my_groups` setting the same field) are returned as warnings
/// alongside the resolved config.
pub fn resolve(
    global: Option<&ConfigScope>,
    group_scopes: &BTreeMap<String, ConfigScope>,
    pc_scope: Option<&ConfigScope>,
    my_groups: &[String],
) -> (EffectiveConfig, Vec<ResolutionWarning>) {
    let mut out = EffectiveConfig::builtin_defaults();
    let mut warnings = Vec::new();

    if let Some(g) = global {
        apply_scope(&mut out, g);
    }

    // Sort + dedup the group list so iteration order is deterministic
    // and "last wins" is well-defined.
    let mut sorted_groups: Vec<&str> = my_groups.iter().map(String::as_str).collect();
    sorted_groups.sort();
    sorted_groups.dedup();

    // Pass 1: find multi-setter fields so the caller can warn before
    // pass 2 silently lets the alphabetical-last value win.
    let mut setters: BTreeMap<&'static str, Vec<String>> = BTreeMap::new();
    for g in &sorted_groups {
        let Some(scope) = group_scopes.get(*g) else {
            continue;
        };
        if scope.target_version.is_some() {
            setters
                .entry("target_version")
                .or_default()
                .push(g.to_string());
        }
        if scope.target_version_jitter.is_some() {
            setters
                .entry("target_version_jitter")
                .or_default()
                .push(g.to_string());
        }
        if scope.inventory_interval.is_some() {
            setters
                .entry("inventory_interval")
                .or_default()
                .push(g.to_string());
        }
        if scope.inventory_jitter.is_some() {
            setters
                .entry("inventory_jitter")
                .or_default()
                .push(g.to_string());
        }
        if scope.inventory_enabled.is_some() {
            setters
                .entry("inventory_enabled")
                .or_default()
                .push(g.to_string());
        }
        if scope.heartbeat_interval.is_some() {
            setters
                .entry("heartbeat_interval")
                .or_default()
                .push(g.to_string());
        }
    }
    for (field, groups) in setters {
        if groups.len() > 1 {
            warnings.push(ResolutionWarning::MultiGroupConflict { field, groups });
        }
    }

    // Pass 2: actually apply, alphabetically. Last-wins by construction.
    for g in &sorted_groups {
        if let Some(scope) = group_scopes.get(*g) {
            apply_scope(&mut out, scope);
        }
    }

    if let Some(p) = pc_scope {
        apply_scope(&mut out, p);
    }

    (out, warnings)
}

fn apply_scope(out: &mut EffectiveConfig, s: &ConfigScope) {
    if let Some(v) = &s.target_version {
        out.target_version = Some(v.clone());
    }
    if let Some(v) = &s.target_version_jitter {
        out.target_version_jitter = v.clone();
    }
    if let Some(v) = &s.inventory_interval {
        out.inventory_interval = v.clone();
    }
    if let Some(v) = &s.inventory_jitter {
        out.inventory_jitter = v.clone();
    }
    if let Some(v) = s.inventory_enabled {
        out.inventory_enabled = v;
    }
    if let Some(v) = &s.heartbeat_interval {
        out.heartbeat_interval = v.clone();
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn scope() -> ConfigScope {
        ConfigScope::default()
    }

    #[test]
    fn empty_stack_gives_builtin_defaults() {
        let (eff, warns) = resolve(None, &BTreeMap::new(), None, &[]);
        assert_eq!(eff, EffectiveConfig::builtin_defaults());
        assert!(warns.is_empty());
    }

    #[test]
    fn global_only() {
        let g = ConfigScope {
            inventory_interval: Some("12h".into()),
            heartbeat_interval: Some("60s".into()),
            ..scope()
        };
        let (eff, _) = resolve(Some(&g), &BTreeMap::new(), None, &[]);
        assert_eq!(eff.inventory_interval, "12h");
        assert_eq!(eff.heartbeat_interval, "60s");
        // Unset fields stay at builtin defaults.
        assert_eq!(eff.inventory_jitter, "10m");
        assert!(eff.inventory_enabled);
        assert!(eff.target_version.is_none());
    }

    #[test]
    fn group_overrides_global() {
        let global = ConfigScope {
            inventory_interval: Some("24h".into()),
            ..scope()
        };
        let mut groups = BTreeMap::new();
        groups.insert(
            "canary".into(),
            ConfigScope {
                inventory_interval: Some("1h".into()),
                ..scope()
            },
        );
        let (eff, warns) = resolve(Some(&global), &groups, None, &["canary".into()]);
        assert_eq!(eff.inventory_interval, "1h");
        assert!(warns.is_empty());
    }

    #[test]
    fn pc_overrides_group() {
        let mut groups = BTreeMap::new();
        groups.insert(
            "wave1".into(),
            ConfigScope {
                inventory_interval: Some("12h".into()),
                ..scope()
            },
        );
        let pc = ConfigScope {
            inventory_interval: Some("5m".into()),
            ..scope()
        };
        let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
        assert_eq!(eff.inventory_interval, "5m");
    }

    #[test]
    fn pc_overrides_global_when_no_group_match() {
        let global = ConfigScope {
            inventory_interval: Some("24h".into()),
            ..scope()
        };
        let pc = ConfigScope {
            inventory_interval: Some("30m".into()),
            ..scope()
        };
        let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
        assert_eq!(eff.inventory_interval, "30m");
    }

    #[test]
    fn partial_override_only_changes_named_fields() {
        let global = ConfigScope {
            inventory_interval: Some("24h".into()),
            heartbeat_interval: Some("30s".into()),
            ..scope()
        };
        let pc = ConfigScope {
            heartbeat_interval: Some("15s".into()),
            // intentionally not touching inventory_interval
            ..scope()
        };
        let (eff, _) = resolve(Some(&global), &BTreeMap::new(), Some(&pc), &[]);
        assert_eq!(eff.inventory_interval, "24h"); // from global
        assert_eq!(eff.heartbeat_interval, "15s"); // from pc
    }

    #[test]
    fn multi_group_conflict_emits_warning() {
        let mut groups = BTreeMap::new();
        groups.insert(
            "wave1".into(),
            ConfigScope {
                inventory_interval: Some("12h".into()),
                ..scope()
            },
        );
        groups.insert(
            "dept-eng".into(),
            ConfigScope {
                inventory_interval: Some("24h".into()),
                ..scope()
            },
        );
        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
        // "dept-eng" sorts before "wave1", so wave1 wins (last alphabetical).
        assert_eq!(eff.inventory_interval, "12h");
        assert_eq!(warns.len(), 1);
        match &warns[0] {
            ResolutionWarning::MultiGroupConflict { field, groups } => {
                assert_eq!(*field, "inventory_interval");
                assert_eq!(groups, &vec!["dept-eng".to_string(), "wave1".to_string()]);
            }
        }
    }

    #[test]
    fn group_alphabetical_last_wins_no_conflict_when_only_one_sets() {
        let mut groups = BTreeMap::new();
        groups.insert(
            "wave1".into(),
            ConfigScope {
                inventory_interval: Some("12h".into()),
                ..scope()
            },
        );
        groups.insert(
            "dept-eng".into(),
            ConfigScope {
                // Different field — doesn't conflict.
                heartbeat_interval: Some("15s".into()),
                ..scope()
            },
        );
        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "dept-eng".into()]);
        assert_eq!(eff.inventory_interval, "12h");
        assert_eq!(eff.heartbeat_interval, "15s");
        assert!(warns.is_empty());
    }

    #[test]
    fn unknown_group_is_silently_ignored() {
        // my_groups names a group that has no scope row yet. Common
        // on the first agent that joins a freshly-named group; the
        // resolver should treat it as a no-op, not an error.
        let mut groups = BTreeMap::new();
        groups.insert(
            "canary".into(),
            ConfigScope {
                inventory_interval: Some("1h".into()),
                ..scope()
            },
        );
        let (eff, warns) = resolve(
            None,
            &groups,
            None,
            &["canary".into(), "ghost-group".into()],
        );
        assert_eq!(eff.inventory_interval, "1h");
        assert!(warns.is_empty());
    }

    #[test]
    fn group_scope_not_applied_when_pc_not_in_group() {
        let mut groups = BTreeMap::new();
        groups.insert(
            "canary".into(),
            ConfigScope {
                target_version: Some("0.3.0".into()),
                ..scope()
            },
        );
        let (eff, _) = resolve(None, &groups, None, &["dept-eng".into()]);
        // PC is NOT in canary, so the rollout target shouldn't apply.
        assert!(eff.target_version.is_none());
    }

    #[test]
    fn duplicate_group_names_dedup_silently() {
        let mut groups = BTreeMap::new();
        groups.insert(
            "wave1".into(),
            ConfigScope {
                inventory_interval: Some("12h".into()),
                ..scope()
            },
        );
        // my_groups carries the same name twice — the dedup pass
        // keeps it from looking like a conflict-with-self.
        let (eff, warns) = resolve(None, &groups, None, &["wave1".into(), "wave1".into()]);
        assert_eq!(eff.inventory_interval, "12h");
        assert!(warns.is_empty());
    }

    #[test]
    fn config_scope_serde_round_trip() {
        let s = ConfigScope {
            target_version: Some("0.3.0".into()),
            heartbeat_interval: Some("15s".into()),
            ..scope()
        };
        let json = serde_json::to_string(&s).unwrap();
        // Only set fields appear in JSON.
        assert_eq!(
            json,
            r#"{"target_version":"0.3.0","heartbeat_interval":"15s"}"#
        );
        let back: ConfigScope = serde_json::from_str(&json).unwrap();
        assert_eq!(back, s);
    }

    #[test]
    fn empty_config_scope_round_trips_as_empty_json() {
        let s = ConfigScope::default();
        assert!(s.is_empty());
        let json = serde_json::to_string(&s).unwrap();
        assert_eq!(json, "{}");
        let back: ConfigScope = serde_json::from_str(&json).unwrap();
        assert_eq!(back, s);
    }

    #[test]
    fn deserialize_tolerates_unknown_fields_for_forward_compat() {
        // Sprint 6+ may add fields (log_level, jitter strategy, …);
        // older agent / backend builds should keep parsing.
        let json = r#"{"target_version":"0.3.0","future_knob":"future_value"}"#;
        let s: ConfigScope = serde_json::from_str(json).unwrap();
        assert_eq!(s.target_version.as_deref(), Some("0.3.0"));
    }

    #[test]
    fn pc_does_not_override_other_pcs() {
        // Sanity: pc_scope passed in is by definition the row for THIS
        // pc; the caller is responsible for picking the right one.
        // This test guards against a future refactor that accidentally
        // wires in the wrong scope by ensuring the apply happens last
        // (after groups), so the PC value is the visible one.
        let mut groups = BTreeMap::new();
        groups.insert(
            "wave1".into(),
            ConfigScope {
                inventory_interval: Some("12h".into()),
                ..scope()
            },
        );
        let pc = ConfigScope {
            inventory_interval: Some("5m".into()),
            ..scope()
        };
        let (eff, _) = resolve(None, &groups, Some(&pc), &["wave1".into()]);
        assert_eq!(eff.inventory_interval, "5m");
    }
}