Skip to main content

osproxy_etcd/
watch.rs

1//! The etcd network paths: initial connect + read, and the background watch that
2//! keeps the cached snapshot fresh. Split from [`super`] because these can only be
3//! exercised against a live etcd (the Docker-gated `tests/etcd_live.rs`), so they
4//! are excluded from unit-coverage by filename like the other live harnesses.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use arc_swap::ArcSwap;
10use etcd_client::{Client, EventType};
11use osproxy_core::Clock;
12use osproxy_observe::{decode_directive_set, DirectiveSet};
13
14use super::{EtcdDirectiveStore, EtcdError};
15
16/// How long the watch task waits before reconnecting after the etcd stream ends
17/// or errors. Bounded so a flapping control plane cannot spin the task hot; a
18/// production adapter would jitter this.
19const RECONNECT_DELAY: Duration = Duration::from_secs(1);
20
21impl EtcdDirectiveStore {
22    /// Connects to `endpoints`, reads the directive set at `key`, and spawns the
23    /// background watch that keeps it fresh.
24    ///
25    /// The value at `key` is the same JSON body the admin publish endpoint accepts
26    /// (`{"directives":[...]}`). A missing key is a valid empty set (everything
27    /// off). A *malformed* value at startup is treated as empty, fail-safe, the
28    /// same as an unparseable later publish.
29    ///
30    /// # Errors
31    /// [`EtcdError::Connect`] if etcd cannot be reached or the initial read fails.
32    pub async fn connect(
33        endpoints: &[String],
34        key: impl Into<String>,
35        clock: Arc<dyn Clock>,
36    ) -> Result<Self, EtcdError> {
37        let key = key.into();
38        let mut client = Client::connect(endpoints, None).await?;
39        let resp = client.get(key.clone(), None).await?;
40        let initial = resp
41            .kvs()
42            .first()
43            .and_then(|kv| decode_directive_set(kv.value(), clock.as_ref()).ok())
44            .unwrap_or_default();
45        let current = Arc::new(ArcSwap::from_pointee(initial));
46
47        // Capture the runtime handle to spawn the watch (spawn discipline: never a
48        // bare tokio::spawn in a library; mirror osproxy-otlp).
49        let handle = tokio::runtime::Handle::current();
50        let endpoints = endpoints.to_vec();
51        let task_current = Arc::clone(&current);
52        handle.spawn(watch_loop(endpoints, key, clock, task_current));
53
54        Ok(Self::from_snapshot(current))
55    }
56}
57
58/// Watches `key` forever, applying each update to `current`. Reconnects after a
59/// bounded delay whenever the stream ends or etcd errors, so a transient outage
60/// degrades to "serve the last snapshot" rather than losing fleet control.
61async fn watch_loop(
62    endpoints: Vec<String>,
63    key: String,
64    clock: Arc<dyn Clock>,
65    current: Arc<ArcSwap<DirectiveSet>>,
66) {
67    loop {
68        // A clean stream end or any error both fall through to the reconnect delay;
69        // the snapshot is left untouched (last-good) across the gap.
70        let _ = watch_once(&endpoints, &key, clock.as_ref(), &current).await;
71        tokio::time::sleep(RECONNECT_DELAY).await;
72    }
73}
74
75/// One connect → watch → drain cycle. Returns when the stream ends or errors.
76async fn watch_once(
77    endpoints: &[String],
78    key: &str,
79    clock: &dyn Clock,
80    current: &ArcSwap<DirectiveSet>,
81) -> Result<(), etcd_client::Error> {
82    let mut client = Client::connect(endpoints, None).await?;
83    // Re-read once on (re)connect so an update missed during a disconnect is not
84    // lost, then stream subsequent changes.
85    let resp = client.get(key, None).await?;
86    if let Some(kv) = resp.kvs().first() {
87        super::apply_value(current, kv.value(), clock);
88    }
89    // The stream itself holds the watch open for the drain's lifetime.
90    let mut stream = client.watch(key, None).await?;
91    while let Some(resp) = stream.message().await? {
92        for event in resp.events() {
93            match event.event_type() {
94                EventType::Put => {
95                    if let Some(kv) = event.kv() {
96                        super::apply_value(current, kv.value(), clock);
97                    }
98                }
99                // A deleted key means "no directives": flip to the empty set.
100                EventType::Delete => {
101                    current.store(Arc::new(DirectiveSet::new()));
102                }
103            }
104        }
105    }
106    Ok(())
107}