Skip to main content

osproxy_etcd/
lib.rs

1//! Reference distributed [`DirectiveStore`] backed by etcd v3.
2//!
3//! A fleet of proxy instances must all see the *same* diagnostics directives, and
4//! a control-plane flip must reach every instance with **no restart** (`docs/05`
5//! ยง3, NFR-T3, ADR-013). This adapter realizes that over etcd's watch API using
6//! the **watch-and-cache** model: a background task subscribes to one etcd key and
7//! keeps a locally-cached [`DirectiveSet`] snapshot fresh, so [`DirectiveStore::load`]
8//! on the request hot path is a cheap `Arc` clone, never per-request network I/O.
9//!
10//! It deliberately backs **only** the directive (observability) control plane.
11//! The migration/placement store (`osproxy-control::MigrationStore`) needs a
12//! linearizable compare-and-swap and a fallible, async seam; wiring it over etcd
13//! is a separate step gated on that seam refactor.
14//!
15//! Posture:
16//! - **Fail-fast at startup**: [`EtcdDirectiveStore::connect`] does an initial
17//!   read, so an unreachable/misconfigured etcd is a loud construction error, not
18//!   a silent empty directive set.
19//! - **Fail-safe while running**: a transient etcd outage or a *malformed* publish
20//!   keeps the **last good** snapshot rather than blanking diagnostics; the watch
21//!   task reconnects with a bounded delay.
22//! - **One fail-closed decoder**: directives are decoded with
23//!   [`osproxy_observe::decode_directive_set`], the same decoder the admin
24//!   `POST /admin/directives` endpoint uses, so a directive means the same thing
25//!   however it is published, and a typo'd key can never widen its blast radius.
26#![deny(missing_docs)]
27
28use std::sync::Arc;
29
30use arc_swap::ArcSwap;
31use osproxy_core::Clock;
32use osproxy_observe::{decode_directive_set, DirectiveSet, DirectiveStore};
33
34mod watch;
35
36/// Errors constructing the store. Only startup is fallible to the caller; once
37/// running, the watch task absorbs transient failures (keeping the last snapshot).
38#[derive(Debug, thiserror::Error)]
39pub enum EtcdError {
40    /// The initial connection or read against etcd failed, fail fast rather than
41    /// serve an empty directive set the operator did not intend.
42    #[error("etcd connect/read failed at startup")]
43    Connect(#[from] etcd_client::Error),
44}
45
46/// A [`DirectiveStore`] whose snapshot is kept fresh by an etcd watch.
47///
48/// Construct with [`EtcdDirectiveStore::connect`] inside a Tokio runtime; it loads
49/// the initial set and spawns the background watch. Clone is cheap (shared
50/// snapshot) so the same store can be handed to the pipeline and an admin surface.
51#[derive(Clone, Debug)]
52pub struct EtcdDirectiveStore {
53    current: Arc<ArcSwap<DirectiveSet>>,
54}
55
56impl EtcdDirectiveStore {
57    /// Wraps an already-built shared snapshot, the seam the [`watch`] connect path
58    /// uses after its initial read.
59    fn from_snapshot(current: Arc<ArcSwap<DirectiveSet>>) -> Self {
60        Self { current }
61    }
62}
63
64impl DirectiveStore for EtcdDirectiveStore {
65    fn load(&self) -> Arc<DirectiveSet> {
66        // Lock-free atomic load of the watch-maintained snapshot (hot path).
67        self.current.load_full()
68    }
69}
70
71/// Swaps in a freshly decoded set, or **keeps the last good snapshot** if the
72/// value does not parse, a malformed publish must never blank fleet diagnostics.
73fn apply_value(current: &ArcSwap<DirectiveSet>, value: &[u8], clock: &dyn Clock) {
74    if let Ok(set) = decode_directive_set(value, clock) {
75        current.store(Arc::new(set));
76    }
77}
78
79#[cfg(test)]
80#[path = "lib_tests.rs"]
81mod tests;