Skip to main content

aex_core/
endpoint.rs

1//! `Endpoint` — a single way a recipient can reach a sender's data plane.
2//!
3//! Endpoints carry an optional [`EndpointHealth`] block populated by the
4//! control plane's background re-validator (ADR-0014 + ADR-0021). The
5//! health machine is asymmetric: three consecutive failed probes flip an
6//! endpoint to `Unhealthy`, but two consecutive successes are required
7//! to flip it back to `Healthy`. Freshly admitted endpoints start
8//! `Healthy` because the admission flow in `aex-control-plane` already
9//! proved them reachable once.
10//!
11//!
12//! Introduced in Sprint 2 for transport plurality (`v1.3.0-beta.1`).
13//! A transfer carries a list of endpoints (`reachable_at[]`); the recipient
14//! SDK tries them in the sender's declared priority order per ADR-0012
15//! (sender-ranked, serial, sticky) and stops at the first that works.
16//!
17//! ```text
18//!     reachable_at[] (JSONB on transfers, JSON on the wire)
19//!         │
20//!         ├── { kind: "cloudflare_quick", url: "https://x.trycloudflare.com", priority: 0 }
21//!         ├── { kind: "iroh",              url: "iroh:NodeID@relay:443",        priority: 1 }
22//!         └── { kind: "frp",               url: "https://frp.example.com/x",    priority: 2 }
23//!              │
24//!              └── recipient tries in priority order, sticks with first success
25//! ```
26//!
27//! ## Forward compatibility
28//!
29//! `kind` is a `String`, not an enum, so unknown kinds from a newer peer
30//! are preserved losslessly. Recipients MUST skip endpoints whose `kind`
31//! is not in [`Endpoint::KNOWN_KINDS`] rather than erroring. This mirrors
32//! the capability-bit philosophy in ADR-0018 — new transports land
33//! additively without requiring a wire bump.
34
35use serde::{Deserialize, Serialize};
36
37/// A single way to reach a sender's data plane.
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct Endpoint {
40    /// Transport kind. See [`Endpoint::KIND_*`] constants for known values.
41    /// Unknown values are preserved but MUST be skipped by recipients.
42    pub kind: String,
43    /// Reachable address. Schema is transport-specific:
44    /// - `cloudflare_quick`, `cloudflare_named`, `tailscale_funnel`, `frp`: `https://host/...`
45    /// - `iroh`: `iroh:<NodeID>@<relay_host>:<port>`
46    pub url: String,
47    /// Sender's preference (lower = try first). Ties broken by array order.
48    #[serde(default)]
49    pub priority: i32,
50    /// Optional last-known-good timestamp (Unix seconds) used by the control
51    /// plane's health cache. Absent on fresh endpoints.
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub health_hint_unix: Option<i64>,
54    /// Background-validator health state (ADR-0014, ADR-0021). Absent
55    /// on wire payloads sent by clients; populated by the control
56    /// plane after the first re-probe cycle. Recipients SHOULD skip
57    /// endpoints whose `health.status` is `Unhealthy`; SDKs that don't
58    /// recognise this field are forward-compatible because `health` is
59    /// additive.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub health: Option<EndpointHealth>,
62}
63
64/// Persisted health state for a single [`Endpoint`]. Kept inline in
65/// the `reachable_at` JSONB so a control-plane restart doesn't reset
66/// the debounce counters; this means a flapping endpoint that was
67/// about to flip `Unhealthy` keeps its accrued failure count across
68/// deploys.
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70pub struct EndpointHealth {
71    pub status: HealthStatus,
72    /// Count of consecutive failed probes since the last success.
73    /// Caps at [`EndpointHealth::FAIL_THRESHOLD`] (any higher is
74    /// irrelevant — the endpoint is already `Unhealthy`).
75    #[serde(default)]
76    pub consecutive_fails: u8,
77    /// Count of consecutive successful probes since the last failure.
78    /// Caps at [`EndpointHealth::SUCCESS_THRESHOLD`].
79    #[serde(default)]
80    pub consecutive_successes: u8,
81    /// Unix seconds of the most recent probe attempt. `None` if the
82    /// background monitor hasn't run yet.
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub last_probe_unix: Option<i64>,
85}
86
87/// Current health classification of an endpoint.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(rename_all = "lowercase")]
90pub enum HealthStatus {
91    Healthy,
92    Unhealthy,
93}
94
95impl EndpointHealth {
96    /// Consecutive failures required to flip `Healthy → Unhealthy`
97    /// (ADR-0021).
98    pub const FAIL_THRESHOLD: u8 = 3;
99    /// Consecutive successes required to flip `Unhealthy → Healthy`
100    /// (ADR-0021). Deliberately higher friction than the failure
101    /// threshold — a flapping endpoint should not race a recipient's
102    /// connection attempt.
103    pub const SUCCESS_THRESHOLD: u8 = 2;
104
105    /// Initial health for an endpoint just admitted by the CP's
106    /// admission-time `/healthz` probe.
107    pub fn fresh_healthy(now_unix: i64) -> Self {
108        Self {
109            status: HealthStatus::Healthy,
110            consecutive_fails: 0,
111            consecutive_successes: 0,
112            last_probe_unix: Some(now_unix),
113        }
114    }
115
116    /// Fold a successful probe into this state. Returns `self` for
117    /// ergonomic `fold`/`for` reassignment in loops.
118    pub fn on_probe_success(mut self, now_unix: i64) -> Self {
119        self.last_probe_unix = Some(now_unix);
120        self.consecutive_fails = 0;
121        // Saturating add so we never wrap around at u8::MAX.
122        self.consecutive_successes = self.consecutive_successes.saturating_add(1);
123        if matches!(self.status, HealthStatus::Unhealthy)
124            && self.consecutive_successes >= Self::SUCCESS_THRESHOLD
125        {
126            self.status = HealthStatus::Healthy;
127            self.consecutive_successes = 0;
128        }
129        // Cap the counter at the threshold once we're Healthy to keep
130        // the on-wire JSON small and bounded.
131        if matches!(self.status, HealthStatus::Healthy)
132            && self.consecutive_successes > Self::SUCCESS_THRESHOLD
133        {
134            self.consecutive_successes = Self::SUCCESS_THRESHOLD;
135        }
136        self
137    }
138
139    /// Fold a failed probe into this state.
140    pub fn on_probe_failure(mut self, now_unix: i64) -> Self {
141        self.last_probe_unix = Some(now_unix);
142        self.consecutive_successes = 0;
143        self.consecutive_fails = self.consecutive_fails.saturating_add(1);
144        if matches!(self.status, HealthStatus::Healthy)
145            && self.consecutive_fails >= Self::FAIL_THRESHOLD
146        {
147            self.status = HealthStatus::Unhealthy;
148            self.consecutive_fails = 0;
149        }
150        if matches!(self.status, HealthStatus::Unhealthy)
151            && self.consecutive_fails > Self::FAIL_THRESHOLD
152        {
153            self.consecutive_fails = Self::FAIL_THRESHOLD;
154        }
155        self
156    }
157
158    /// True iff the endpoint is currently classified `Healthy`.
159    pub fn is_healthy(&self) -> bool {
160        matches!(self.status, HealthStatus::Healthy)
161    }
162}
163
164impl Endpoint {
165    /// Cloudflare Quick Tunnel (`*.trycloudflare.com`, ephemeral).
166    pub const KIND_CLOUDFLARE_QUICK: &'static str = "cloudflare_quick";
167    /// Cloudflare Named Tunnel (`*.workers.dev` or custom hostname, persistent).
168    pub const KIND_CLOUDFLARE_NAMED: &'static str = "cloudflare_named";
169    /// Iroh peer-to-peer with DERP relay fallback.
170    pub const KIND_IROH: &'static str = "iroh";
171    /// Tailscale Funnel (public hostname on a tailnet).
172    pub const KIND_TAILSCALE_FUNNEL: &'static str = "tailscale_funnel";
173    /// FRP self-hosted reverse proxy.
174    pub const KIND_FRP: &'static str = "frp";
175
176    /// All kinds this crate knows how to reach. Adding a new transport in a
177    /// later sprint adds a constant here + extends this array.
178    pub const KNOWN_KINDS: &'static [&'static str] = &[
179        Self::KIND_CLOUDFLARE_QUICK,
180        Self::KIND_CLOUDFLARE_NAMED,
181        Self::KIND_IROH,
182        Self::KIND_TAILSCALE_FUNNEL,
183        Self::KIND_FRP,
184    ];
185
186    /// True if `self.kind` is in [`Self::KNOWN_KINDS`]. Recipients use this
187    /// to skip forward-incompatible endpoints without failing the transfer.
188    pub fn is_known_kind(&self) -> bool {
189        Self::KNOWN_KINDS.contains(&self.kind.as_str())
190    }
191
192    /// Convenience: Cloudflare Quick Tunnel endpoint at priority 0.
193    pub fn cloudflare_quick(url: impl Into<String>) -> Self {
194        Self {
195            kind: Self::KIND_CLOUDFLARE_QUICK.into(),
196            url: url.into(),
197            priority: 0,
198            health_hint_unix: None,
199            health: None,
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn cloudflare_quick_builder() {
210        let e = Endpoint::cloudflare_quick("https://foo.trycloudflare.com");
211        assert_eq!(e.kind, "cloudflare_quick");
212        assert_eq!(e.url, "https://foo.trycloudflare.com");
213        assert_eq!(e.priority, 0);
214        assert!(e.is_known_kind());
215    }
216
217    #[test]
218    fn unknown_kind_preserved_and_flagged() {
219        let e = Endpoint {
220            kind: "future_transport_v9".into(),
221            url: "future:alien@mars:443".into(),
222            priority: 5,
223            health_hint_unix: None,
224            health: None,
225        };
226        assert!(!e.is_known_kind());
227    }
228
229    #[test]
230    fn serde_roundtrip_minimal() {
231        let original = Endpoint::cloudflare_quick("https://x.trycloudflare.com");
232        let json = serde_json::to_string(&original).unwrap();
233        // Priority 0 is the default but explicit in serialization; health_hint absent.
234        assert!(json.contains(r#""kind":"cloudflare_quick""#));
235        assert!(json.contains(r#""url":"https://x.trycloudflare.com""#));
236        assert!(!json.contains("health_hint_unix"));
237        let back: Endpoint = serde_json::from_str(&json).unwrap();
238        assert_eq!(back, original);
239    }
240
241    #[test]
242    fn serde_roundtrip_with_health_hint() {
243        let original = Endpoint {
244            kind: Endpoint::KIND_IROH.into(),
245            url: "iroh:abc123@relay.aex.dev:443".into(),
246            priority: 1,
247            health_hint_unix: Some(1_700_000_000),
248            health: None,
249        };
250        let json = serde_json::to_string(&original).unwrap();
251        assert!(json.contains(r#""health_hint_unix":1700000000"#));
252        let back: Endpoint = serde_json::from_str(&json).unwrap();
253        assert_eq!(back, original);
254    }
255
256    #[test]
257    fn endpoint_health_fresh_is_healthy() {
258        let h = EndpointHealth::fresh_healthy(1_700_000_000);
259        assert_eq!(h.status, HealthStatus::Healthy);
260        assert_eq!(h.consecutive_fails, 0);
261        assert_eq!(h.consecutive_successes, 0);
262        assert_eq!(h.last_probe_unix, Some(1_700_000_000));
263    }
264
265    #[test]
266    fn health_flips_to_unhealthy_after_three_fails() {
267        let mut h = EndpointHealth::fresh_healthy(0);
268        h = h.on_probe_failure(1);
269        assert_eq!(h.status, HealthStatus::Healthy, "1 fail: still healthy");
270        h = h.on_probe_failure(2);
271        assert_eq!(h.status, HealthStatus::Healthy, "2 fails: still healthy");
272        h = h.on_probe_failure(3);
273        assert_eq!(
274            h.status,
275            HealthStatus::Unhealthy,
276            "3rd fail must flip to unhealthy"
277        );
278        assert_eq!(h.last_probe_unix, Some(3));
279    }
280
281    #[test]
282    fn health_stays_unhealthy_after_one_success() {
283        let mut h = EndpointHealth {
284            status: HealthStatus::Unhealthy,
285            consecutive_fails: 0,
286            consecutive_successes: 0,
287            last_probe_unix: Some(0),
288        };
289        h = h.on_probe_success(1);
290        assert_eq!(
291            h.status,
292            HealthStatus::Unhealthy,
293            "1 success is not enough to heal"
294        );
295        assert_eq!(h.consecutive_successes, 1);
296    }
297
298    #[test]
299    fn health_heals_after_two_successes() {
300        let mut h = EndpointHealth {
301            status: HealthStatus::Unhealthy,
302            consecutive_fails: 2,
303            consecutive_successes: 0,
304            last_probe_unix: Some(0),
305        };
306        h = h.on_probe_success(1);
307        h = h.on_probe_success(2);
308        assert_eq!(h.status, HealthStatus::Healthy);
309        assert_eq!(
310            h.consecutive_fails, 0,
311            "healing must reset the fail counter"
312        );
313        assert_eq!(
314            h.consecutive_successes, 0,
315            "counter resets after a flip so the state machine is fresh again"
316        );
317    }
318
319    #[test]
320    fn success_resets_fail_counter_without_flipping() {
321        // Two fails accrued but not three → still Healthy. A fresh
322        // success must wipe the counter so a later 3rd fail doesn't
323        // unfairly stack with the old ones.
324        let mut h = EndpointHealth::fresh_healthy(0);
325        h = h.on_probe_failure(1);
326        h = h.on_probe_failure(2);
327        assert_eq!(h.consecutive_fails, 2);
328        h = h.on_probe_success(3);
329        assert_eq!(h.consecutive_fails, 0);
330        assert_eq!(h.status, HealthStatus::Healthy);
331    }
332
333    #[test]
334    fn failure_resets_success_counter() {
335        // Mid-heal (one success accrued) then a fail drops us back to
336        // zero successes — healing must be two consecutive.
337        let mut h = EndpointHealth {
338            status: HealthStatus::Unhealthy,
339            consecutive_fails: 0,
340            consecutive_successes: 1,
341            last_probe_unix: Some(0),
342        };
343        h = h.on_probe_failure(1);
344        assert_eq!(h.consecutive_successes, 0);
345        assert_eq!(h.status, HealthStatus::Unhealthy);
346    }
347
348    #[test]
349    fn counters_are_saturated_not_wrapping() {
350        // A healthy endpoint that has survived many probes must not
351        // wrap the u8 success counter — we cap at the threshold.
352        let mut h = EndpointHealth::fresh_healthy(0);
353        for i in 1..=10 {
354            h = h.on_probe_success(i);
355        }
356        assert!(h.consecutive_successes <= EndpointHealth::SUCCESS_THRESHOLD);
357        assert_eq!(h.status, HealthStatus::Healthy);
358    }
359
360    #[test]
361    fn health_round_trips_through_json() {
362        let h = EndpointHealth {
363            status: HealthStatus::Unhealthy,
364            consecutive_fails: 3,
365            consecutive_successes: 0,
366            last_probe_unix: Some(1_700_000_000),
367        };
368        let json = serde_json::to_string(&h).unwrap();
369        // Status lowercase for human-readable JSONB.
370        assert!(json.contains(r#""status":"unhealthy""#));
371        let back: EndpointHealth = serde_json::from_str(&json).unwrap();
372        assert_eq!(back, h);
373    }
374
375    #[test]
376    fn deserialize_preserves_unknown_kind() {
377        let json = r#"{"kind":"unknown_transport","url":"x://y","priority":9}"#;
378        let e: Endpoint = serde_json::from_str(json).unwrap();
379        assert_eq!(e.kind, "unknown_transport");
380        assert!(!e.is_known_kind());
381    }
382
383    #[test]
384    fn priority_defaults_to_zero_when_missing() {
385        let json = r#"{"kind":"cloudflare_quick","url":"https://x.trycloudflare.com"}"#;
386        let e: Endpoint = serde_json::from_str(json).unwrap();
387        assert_eq!(e.priority, 0);
388        assert_eq!(e.health_hint_unix, None);
389    }
390
391    #[test]
392    fn known_kinds_covers_sprint_2_transports() {
393        for k in [
394            Endpoint::KIND_CLOUDFLARE_QUICK,
395            Endpoint::KIND_CLOUDFLARE_NAMED,
396            Endpoint::KIND_IROH,
397            Endpoint::KIND_TAILSCALE_FUNNEL,
398            Endpoint::KIND_FRP,
399        ] {
400            assert!(Endpoint::KNOWN_KINDS.contains(&k), "kind {k} missing");
401        }
402    }
403}