# etcd Adapter
Streams a key-prefix snapshot + live watch from etcd (`etcd_sub`) and writes
key-value pairs to etcd (`etcd_pub`).
## Module Structure
```
etcd/
mod.rs # EtcdConnection, EtcdEntry, EtcdEvent, public re-exports
read.rs # etcd_sub() producer
write.rs # etcd_pub() consumer, EtcdPubOperators trait
integration_tests.rs # Integration tests (requires Docker, gated by feature)
CLAUDE.md # This file
```
## Key Components
### Reading from etcd — `etcd_sub`
- `etcd_sub(conn, prefix)` — produces `Burst<EtcdEvent>`
- **Phase 1:** emits a snapshot of all current KVs matching the prefix as `EtcdEventKind::Put`
- **Phase 2:** streams live watch events (Put and Delete)
- The watch is opened **before** the GET to guarantee no writes are missed in the handoff window
- Watch events with `mod_revision ≤ snapshot_rev` are filtered out to prevent duplicates
### Writing to etcd — `etcd_pub`
- `etcd_pub(conn, upstream, lease_ttl)` — consumes `Burst<EtcdEntry>`, issues one PUT per entry
- `EtcdPubOperators::etcd_pub(conn, lease_ttl)` — fluent API on `Rc<dyn Stream<Burst<EtcdEntry>>>`
- Pass `lease_ttl: None` for plain writes (keys persist until deleted)
- Pass `lease_ttl: Some(Duration)` to attach an etcd lease with automatic keepalive renewal;
the lease is revoked on clean shutdown so keys vanish immediately
### Types
- `EtcdConnection::new(endpoint)` — single endpoint (e.g. `"http://localhost:2379"`)
- `EtcdConnection::with_endpoints(iter)` — cluster endpoints
- `EtcdEntry { key: String, value: Vec<u8> }` — plain key-value pair; `.value_str()` for UTF-8
- `EtcdEvent { kind: EtcdEventKind, entry: EtcdEntry, revision: i64 }` — watch event
## Snapshot → Watch Handoff (Race Prevention)
```
Thread: WATCH(prefix) ──→ GET(prefix, snapshot_rev) ──→ emit snapshot ──→ drain watch_stream
(skip mod_rev ≤ snapshot_rev)
```
Any write committed between WATCH registration and GET completion will appear in
`watch_stream` with `mod_revision > snapshot_rev` — never missed.
Any write already visible in the GET has `mod_revision ≤ snapshot_rev` — filtered
out as a duplicate.
## Pre-Commit Requirements
1. **Run integration tests (requires Docker):**
```bash
cargo test --features etcd-integration-test -p wingfoil \
-- --test-threads=1 etcd::integration_tests
```
2. **Run standard checks:**
```bash
cargo fmt --all
cargo clippy --workspace --all-targets --all-features
cargo test -p wingfoil
```
## Integration Test Details
Tests use `testcontainers` (`SyncRunner`) to start a `gcr.io/etcd-development/etcd:v3.5.0`
container per test. Docker must be running. No environment variables required.
Feature flag: `etcd-integration-test` (implies `etcd`).
Tests must be run with `--test-threads=1` to avoid port conflicts between containers.
### Test coverage
| `test_connection_refused` | Error propagates when etcd is unreachable |
| `test_sub_empty_snapshot_then_live_write` | Empty snapshot + live event works correctly |
| `test_sub_snapshot_with_existing_keys` | Pre-existing keys appear in snapshot phase |
| `test_sub_live_updates` | Live events arrive after snapshot |
| `test_pub_round_trip` | `etcd_pub` writes are readable via direct client |
| `test_pub_lease_keys_expire_after_revoke` | Leased keys vanish immediately when consumer stops |
| `test_pub_lease_keepalive_extends_ttl` | Keepalive renews lease so keys survive past raw TTL |
| `test_pub_no_lease_keys_persist` | `None` lease: keys remain after consumer stops |
| `test_sub_delete_events` | `EtcdEventKind::Delete` is emitted correctly |
| `test_sub_no_race_between_snapshot_and_watch` | Concurrent write not missed or duplicated |
## Notes
- `etcd-client` is pinned to `"0.18"`. The API changed significantly between versions.
- `testcontainers-modules` 0.15 does not include an etcd module; we use `GenericImage`
with `gcr.io/etcd-development/etcd` directly.
- `etcd_sub` is designed for `RunMode::RealTime`. Using it in `HistoricalFrom` mode is
technically valid but timestamps will be wall-clock `NanoTime::now()`, not historical.