osproxy_etcd/watch.rs
1//! The etcd network paths: initial connect + read, and the background watch that
2//! keeps the cached snapshot fresh. Split from [`super`] because these can only be
3//! exercised against a live etcd (the Docker-gated `tests/etcd_live.rs`), so they
4//! are excluded from unit-coverage by filename like the other live harnesses.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use arc_swap::ArcSwap;
10use etcd_client::{Client, EventType};
11use osproxy_core::Clock;
12use osproxy_observe::{decode_directive_set, DirectiveSet};
13
14use super::{EtcdDirectiveStore, EtcdError};
15
16/// How long the watch task waits before reconnecting after the etcd stream ends
17/// or errors. Bounded so a flapping control plane cannot spin the task hot; a
18/// production adapter would jitter this.
19const RECONNECT_DELAY: Duration = Duration::from_secs(1);
20
21impl EtcdDirectiveStore {
22 /// Connects to `endpoints`, reads the directive set at `key`, and spawns the
23 /// background watch that keeps it fresh.
24 ///
25 /// The value at `key` is the same JSON body the admin publish endpoint accepts
26 /// (`{"directives":[...]}`). A missing key is a valid empty set (everything
27 /// off). A *malformed* value at startup is treated as empty, fail-safe, the
28 /// same as an unparseable later publish.
29 ///
30 /// # Errors
31 /// [`EtcdError::Connect`] if etcd cannot be reached or the initial read fails.
32 pub async fn connect(
33 endpoints: &[String],
34 key: impl Into<String>,
35 clock: Arc<dyn Clock>,
36 ) -> Result<Self, EtcdError> {
37 let key = key.into();
38 let mut client = Client::connect(endpoints, None).await?;
39 let resp = client.get(key.clone(), None).await?;
40 let initial = resp
41 .kvs()
42 .first()
43 .and_then(|kv| decode_directive_set(kv.value(), clock.as_ref()).ok())
44 .unwrap_or_default();
45 let current = Arc::new(ArcSwap::from_pointee(initial));
46
47 // Capture the runtime handle to spawn the watch (spawn discipline: never a
48 // bare tokio::spawn in a library; mirror osproxy-otlp).
49 let handle = tokio::runtime::Handle::current();
50 let endpoints = endpoints.to_vec();
51 let task_current = Arc::clone(¤t);
52 handle.spawn(watch_loop(endpoints, key, clock, task_current));
53
54 Ok(Self::from_snapshot(current))
55 }
56}
57
58/// Watches `key` forever, applying each update to `current`. Reconnects after a
59/// bounded delay whenever the stream ends or etcd errors, so a transient outage
60/// degrades to "serve the last snapshot" rather than losing fleet control.
61async fn watch_loop(
62 endpoints: Vec<String>,
63 key: String,
64 clock: Arc<dyn Clock>,
65 current: Arc<ArcSwap<DirectiveSet>>,
66) {
67 loop {
68 // A clean stream end or any error both fall through to the reconnect delay;
69 // the snapshot is left untouched (last-good) across the gap.
70 let _ = watch_once(&endpoints, &key, clock.as_ref(), ¤t).await;
71 tokio::time::sleep(RECONNECT_DELAY).await;
72 }
73}
74
75/// One connect → watch → drain cycle. Returns when the stream ends or errors.
76async fn watch_once(
77 endpoints: &[String],
78 key: &str,
79 clock: &dyn Clock,
80 current: &ArcSwap<DirectiveSet>,
81) -> Result<(), etcd_client::Error> {
82 let mut client = Client::connect(endpoints, None).await?;
83 // Re-read once on (re)connect so an update missed during a disconnect is not
84 // lost, then stream subsequent changes.
85 let resp = client.get(key, None).await?;
86 if let Some(kv) = resp.kvs().first() {
87 super::apply_value(current, kv.value(), clock);
88 }
89 // The stream itself holds the watch open for the drain's lifetime.
90 let mut stream = client.watch(key, None).await?;
91 while let Some(resp) = stream.message().await? {
92 for event in resp.events() {
93 match event.event_type() {
94 EventType::Put => {
95 if let Some(kv) = event.kv() {
96 super::apply_value(current, kv.value(), clock);
97 }
98 }
99 // A deleted key means "no directives": flip to the empty set.
100 EventType::Delete => {
101 current.store(Arc::new(DirectiveSet::new()));
102 }
103 }
104 }
105 }
106 Ok(())
107}