1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
//! etcd adapter — streaming reads (watch) and writes (put) for etcd key-value store.
//!
//! Provides two graph nodes:
//!
//! - [`etcd_sub`] — producer that emits a key-prefix snapshot followed by live watch events
//! - [`etcd_pub`] — consumer that writes key-value pairs to etcd, with optional lease support
//!
//! # Setup
//!
//! ## Local (Docker)
//!
//! ```sh
//! docker run --rm -p 2379:2379 \
//! -e ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 \
//! -e ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 \
//! gcr.io/etcd-development/etcd:v3.5.0
//! ```
//!
//! ## Kubernetes
//!
//! Deploy etcd as a `StatefulSet` and expose it via a `ClusterIP` service:
//!
//! ```yaml
//! apiVersion: apps/v1
//! kind: StatefulSet
//! metadata:
//! name: etcd
//! spec:
//! selector:
//! matchLabels:
//! app: etcd
//! serviceName: etcd
//! replicas: 1
//! template:
//! metadata:
//! labels:
//! app: etcd
//! spec:
//! containers:
//! - name: etcd
//! image: gcr.io/etcd-development/etcd:v3.5.0
//! ports:
//! - containerPort: 2379
//! env:
//! - name: ETCD_LISTEN_CLIENT_URLS
//! value: http://0.0.0.0:2379
//! - name: ETCD_ADVERTISE_CLIENT_URLS
//! value: http://etcd:2379
//! ---
//! apiVersion: v1
//! kind: Service
//! metadata:
//! name: etcd
//! spec:
//! selector:
//! app: etcd
//! ports:
//! - port: 2379
//! targetPort: 2379
//! ```
//!
//! Connect from within the cluster using the service DNS name:
//!
//! ```ignore
//! let conn = EtcdConnection::new("http://etcd:2379");
//! ```
//!
//! For a cluster with multiple replicas, pass all endpoints:
//!
//! ```ignore
//! let conn = EtcdConnection::with_endpoints([
//! "http://etcd-0.etcd:2379",
//! "http://etcd-1.etcd:2379",
//! "http://etcd-2.etcd:2379",
//! ]);
//! ```
//!
//! # Subscribing to a key prefix
//!
//! [`etcd_sub`] first emits a snapshot of all keys matching the prefix as
//! [`EtcdEventKind::Put`] events, then streams live watch events (puts and deletes).
//! The watch is registered *before* the GET, so no writes are missed during the handoff.
//!
//! ```ignore
//! use wingfoil::adapters::etcd::*;
//! use wingfoil::*;
//!
//! let conn = EtcdConnection::new("http://localhost:2379");
//!
//! etcd_sub(conn, "/config/")
//! .collapse()
//! .for_each(|event, _| {
//! println!("{:?} {} = {:?}", event.kind, event.entry.key, event.entry.value_str())
//! })
//! .run(RunMode::RealTime, RunFor::Forever)
//! .unwrap();
//! ```
//!
//! # Publishing key-value pairs
//!
//! [`etcd_pub`] (or the fluent `.etcd_pub()` method) consumes a
//! `Burst<EtcdEntry>` stream and writes each entry to etcd.
//!
//! ```ignore
//! use wingfoil::adapters::etcd::*;
//! use wingfoil::*;
//!
//! let conn = EtcdConnection::new("http://localhost:2379");
//!
//! // Write a fixed set of keys once, then stop.
//! constant(burst![
//! EtcdEntry { key: "/config/host".into(), value: b"localhost".to_vec() },
//! EtcdEntry { key: "/config/port".into(), value: b"8080".to_vec() },
//! ])
//! .etcd_pub(conn, None, true)
//! .run(RunMode::RealTime, RunFor::Cycles(1))
//! .unwrap();
//! ```
//!
//! ## Leases
//!
//! Pass a TTL to attach an etcd lease. Keys are automatically kept alive while the
//! graph is running and revoked immediately on clean shutdown:
//!
//! ```ignore
//! use std::time::Duration;
//! use wingfoil::adapters::etcd::*;
//! use wingfoil::*;
//!
//! let conn = EtcdConnection::new("http://localhost:2379");
//!
//! // Keys expire 30 s after the consumer shuts down (or immediately on clean shutdown).
//! ticker(Duration::from_secs(10))
//! .map(|_| burst![EtcdEntry { key: "/heartbeat".into(), value: b"ok".to_vec() }])
//! .etcd_pub(conn, Some(Duration::from_secs(30)), true)
//! .run(RunMode::RealTime, RunFor::Forever)
//! .unwrap();
//! ```
//!
//! # Round-trip example
//!
//! See [`examples/etcd`](https://github.com/search?q=examples/etcd) for a full working
//! example that seeds keys with `etcd_pub`, watches them with `etcd_sub`, transforms
//! the values, and writes the results to a second prefix.
//!
//! ```ignore
//! use wingfoil::adapters::etcd::*;
//! use wingfoil::*;
//!
//! const SOURCE: &str = "/example/source/";
//! const DEST: &str = "/example/dest/";
//!
//! let conn = EtcdConnection::new("http://localhost:2379");
//!
//! let seed = constant(burst![
//! EtcdEntry { key: format!("{SOURCE}greeting"), value: b"hello".to_vec() },
//! EtcdEntry { key: format!("{SOURCE}subject"), value: b"world".to_vec() },
//! ])
//! .etcd_pub(conn.clone(), None, true);
//!
//! let round_trip = etcd_sub(conn.clone(), SOURCE)
//! .map(|burst| {
//! burst.into_iter().map(|event| {
//! let upper = event.entry.value_str().unwrap_or("").to_uppercase().into_bytes();
//! EtcdEntry { key: event.entry.key.replacen(SOURCE, DEST, 1), value: upper }
//! })
//! .collect::<Burst<EtcdEntry>>()
//! })
//! .etcd_pub(conn, None, true);
//!
//! Graph::new(vec![seed, round_trip], RunMode::RealTime, RunFor::Cycles(3)).run().unwrap();
//! ```
pub use *;
pub use *;
/// Connection configuration for etcd.
/// A single key-value pair from etcd.
/// The type of change represented by an [`EtcdEvent`].
/// An event from an etcd watch stream.
///
/// Snapshot events (from the initial GET) are always [`EtcdEventKind::Put`].
/// Subsequent watch events reflect the actual change type from etcd.