Skip to main content

sozu_command_lib/
state.rs

1use std::{
2    collections::{
3        BTreeMap, BTreeSet, HashMap, HashSet, btree_map::Entry as BTreeMapEntry,
4        hash_map::DefaultHasher,
5    },
6    fs::File,
7    hash::{Hash, Hasher},
8    io::Write,
9    iter::repeat,
10    net::SocketAddr,
11};
12
13use prost::{Message, UnknownEnumValue};
14
15use crate::{
16    ObjectKind,
17    certificate::{CertificateError, Fingerprint, calculate_fingerprint},
18    proto::{
19        command::{
20            ActivateListener, AddBackend, AddCertificate, CertificateAndKey, Cluster,
21            ClusterInformation, CustomHttpAnswers, DeactivateListener, FrontendFilters,
22            HealthChecksList, HttpListenerConfig, HttpsListenerConfig, InitialState,
23            ListedFrontends, ListenerType, ListenersList, PathRule, QueryCertificatesFilters,
24            RemoveBackend, RemoveCertificate, RemoveListener, ReplaceCertificate, Request,
25            RequestCounts, RequestHttpFrontend, RequestTcpFrontend, RequestUdpFrontend,
26            SetHealthCheck, SocketAddress, TcpListenerConfig, UdpListenerConfig,
27            UpdateHttpListenerConfig, UpdateHttpsListenerConfig, UpdateTcpListenerConfig,
28            UpdateUdpListenerConfig, WorkerRequest, request::RequestType,
29        },
30        display::format_request_type,
31    },
32    response::{Backend, HttpFrontend, TcpFrontend, UdpFrontend},
33};
34
35/// To use throughout Sōzu
36pub type ClusterId = String;
37
38#[derive(thiserror::Error, Debug)]
39pub enum StateError {
40    #[error("Request came in empty")]
41    EmptyRequest,
42    #[error("dispatching this request did not bring any change to the state")]
43    NoChange,
44    #[error("State can not handle this request")]
45    UndispatchableRequest,
46    #[error("Did not find {kind:?} with address or id '{id}'")]
47    NotFound { kind: ObjectKind, id: String },
48    #[error("{kind:?} '{id}' already exists")]
49    Exists { kind: ObjectKind, id: String },
50    #[error("Wrong field value: {0}")]
51    WrongFieldValue(UnknownEnumValue),
52    #[error("Could not add certificate: {0}")]
53    AddCertificate(CertificateError),
54    #[error("Could not remove certificate: {0}")]
55    RemoveCertificate(String),
56    #[error("Could not replace certificate: {0}")]
57    ReplaceCertificate(String),
58    #[error(
59        "Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
60    )]
61    FrontendConversion { frontend: String, error: String },
62    #[error("Could not write state to file: {0}")]
63    FileError(std::io::Error),
64    #[error("Invalid value for field '{field}': {reason}")]
65    InvalidValue {
66        field: &'static str,
67        reason: &'static str,
68    },
69}
70
71/// The `ConfigState` represents the state of Sōzu's business, which is to forward traffic
72/// from frontends to backends. Hence, it contains all details about:
73///
74/// - listeners (socket addresses, for TCP and HTTP connections)
75/// - frontends (bind to a listener)
76/// - backends (to forward connections to)
77/// - clusters (routing rules from frontends to backends)
78/// - TLS certificates
79#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
80pub struct ConfigState {
81    pub clusters: BTreeMap<ClusterId, Cluster>,
82    pub backends: BTreeMap<ClusterId, Vec<Backend>>,
83    /// socket address -> HTTP listener
84    pub http_listeners: BTreeMap<SocketAddr, HttpListenerConfig>,
85    /// socket address -> HTTPS listener
86    pub https_listeners: BTreeMap<SocketAddr, HttpsListenerConfig>,
87    /// socket address -> TCP listener
88    pub tcp_listeners: BTreeMap<SocketAddr, TcpListenerConfig>,
89    /// socket address -> UDP listener
90    pub udp_listeners: BTreeMap<SocketAddr, UdpListenerConfig>,
91    /// HTTP frontends, indexed by a summary of each front's address;hostname;path, for uniqueness.
92    /// For example: `"0.0.0.0:8080;lolcatho.st;P/api"`
93    pub http_fronts: BTreeMap<String, HttpFrontend>,
94    /// indexed by (address, hostname, path)
95    pub https_fronts: BTreeMap<String, HttpFrontend>,
96    pub tcp_fronts: HashMap<ClusterId, Vec<TcpFrontend>>,
97    pub udp_fronts: HashMap<ClusterId, Vec<UdpFrontend>>,
98    pub certificates: HashMap<SocketAddr, HashMap<Fingerprint, CertificateAndKey>>,
99    /// A census of requests that were received. Name of the request -> number of occurences
100    pub request_counts: BTreeMap<String, i32>,
101}
102
103impl ConfigState {
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    pub fn dispatch(&mut self, request: &Request) -> Result<(), StateError> {
109        let request_type = match &request.request_type {
110            Some(t) => t,
111            None => return Err(StateError::EmptyRequest),
112        };
113
114        self.increment_request_count(request);
115
116        let result = match request_type {
117            RequestType::AddCluster(cluster) => self.add_cluster(cluster),
118            RequestType::RemoveCluster(cluster_id) => self.remove_cluster(cluster_id),
119            RequestType::AddHttpListener(listener) => self.add_http_listener(listener),
120            RequestType::AddHttpsListener(listener) => self.add_https_listener(listener),
121            RequestType::AddTcpListener(listener) => self.add_tcp_listener(listener),
122            RequestType::AddUdpListener(listener) => self.add_udp_listener(listener),
123            RequestType::RemoveListener(remove) => self.remove_listener(remove),
124            RequestType::ActivateListener(activate) => self.activate_listener(activate),
125            RequestType::DeactivateListener(deactivate) => self.deactivate_listener(deactivate),
126            RequestType::AddHttpFrontend(front) => self.add_http_frontend(front),
127            RequestType::RemoveHttpFrontend(front) => self.remove_http_frontend(front),
128            RequestType::AddCertificate(add) => self.add_certificate(add),
129            RequestType::RemoveCertificate(remove) => self.remove_certificate(remove),
130            RequestType::ReplaceCertificate(replace) => self.replace_certificate(replace),
131            RequestType::AddHttpsFrontend(front) => self.add_https_frontend(front),
132            RequestType::RemoveHttpsFrontend(front) => self.remove_https_frontend(front),
133            RequestType::AddTcpFrontend(front) => self.add_tcp_frontend(front),
134            RequestType::RemoveTcpFrontend(front) => self.remove_tcp_frontend(front),
135            RequestType::AddUdpFrontend(front) => self.add_udp_frontend(front),
136            RequestType::RemoveUdpFrontend(front) => self.remove_udp_frontend(front),
137            RequestType::AddBackend(add_backend) => self.add_backend(add_backend),
138            RequestType::RemoveBackend(backend) => self.remove_backend(backend),
139            RequestType::UpdateHttpListener(patch) => self.update_http_listener(patch),
140            RequestType::UpdateHttpsListener(patch) => self.update_https_listener(patch),
141            RequestType::UpdateTcpListener(patch) => self.update_tcp_listener(patch),
142            RequestType::UpdateUdpListener(patch) => self.update_udp_listener(patch),
143            RequestType::SetHealthCheck(set) => self.set_health_check(set),
144            RequestType::RemoveHealthCheck(cluster_id) => self.remove_health_check(cluster_id),
145
146            // This is to avoid the error message. These request types are
147            // worker-only / runtime-only and do not affect the persisted
148            // ConfigState (e.g., a worker-side global limit set via
149            // SetMaxConnectionsPerIp does NOT survive a worker restart;
150            // operators must mirror the change in the TOML to make it
151            // sticky).
152            RequestType::Logging(_)
153            | RequestType::CountRequests(_)
154            | RequestType::Status(_)
155            | RequestType::SoftStop(_)
156            | RequestType::QueryCertificatesFromWorkers(_)
157            | RequestType::QueryClusterById(_)
158            | RequestType::QueryClustersByDomain(_)
159            | RequestType::QueryMetrics(_)
160            | RequestType::QueryClustersHashes(_)
161            | RequestType::ConfigureMetrics(_)
162            | RequestType::SetMetricDetail(_)
163            | RequestType::ReturnListenSockets(_)
164            | RequestType::SetMaxConnectionsPerIp(_)
165            | RequestType::QueryMaxConnectionsPerIp(_)
166            | RequestType::HardStop(_) => Ok(()),
167
168            _other_request => Err(StateError::UndispatchableRequest),
169        };
170
171        // Run-to-completion postcondition: whatever path `dispatch` took, the
172        // cross-map invariants of the model must hold once it returns. We run
173        // the full sweep on both success and error: a failed mutating handler
174        // (e.g. a duplicate `add_*` or an absent `remove_*`) is required to be
175        // a no-op, so the invariants must be intact regardless of the result.
176        #[cfg(debug_assertions)]
177        self.check_invariants();
178
179        result
180    }
181
182    /// Full cross-map invariant sweep for the control-plane state model.
183    ///
184    /// This is the run-to-completion postcondition called via a
185    /// `#[cfg(debug_assertions)]` guard at the end of [`Self::dispatch`]. It
186    /// encodes the coherence invariants the diff/replay machinery relies on:
187    /// every map entry is self-consistent (a value's stored key/cluster_id
188    /// matches the key it is filed under), and the public accounting helpers
189    /// (`count_frontends`/`count_backends`) agree with the raw map contents.
190    ///
191    /// Compiled out entirely in release builds (no body, no callers).
192    #[cfg(debug_assertions)]
193    fn check_invariants(&self) {
194        // Listener maps: the value's `address` field must match the SocketAddr
195        // key it is filed under, or a hot-upgrade replay (which re-derives the
196        // key from the value) would land the entry under a different key.
197        for (addr, listener) in &self.http_listeners {
198            debug_assert_eq!(
199                SocketAddr::from(listener.address),
200                *addr,
201                "http_listener value address must match its map key"
202            );
203        }
204        for (addr, listener) in &self.https_listeners {
205            debug_assert_eq!(
206                SocketAddr::from(listener.address),
207                *addr,
208                "https_listener value address must match its map key"
209            );
210        }
211        for (addr, listener) in &self.tcp_listeners {
212            debug_assert_eq!(
213                SocketAddr::from(listener.address),
214                *addr,
215                "tcp_listener value address must match its map key"
216            );
217        }
218
219        // Clusters: the value's `cluster_id` must match the key it is filed
220        // under (replay re-keys on `cluster.cluster_id`).
221        for (cluster_id, cluster) in &self.clusters {
222            debug_assert_eq!(
223                &cluster.cluster_id, cluster_id,
224                "cluster value cluster_id must match its map key"
225            );
226        }
227
228        // Backends: grouped by cluster_id. Every backend in a bucket must carry
229        // that bucket's cluster_id (replay groups on `backend.cluster_id`), and
230        // the per-cluster Vec must stay deduplicated on (backend_id, address) —
231        // `add_backend` upserts, so duplicates would mean lost state.
232        for (cluster_id, backends) in &self.backends {
233            for backend in backends {
234                debug_assert_eq!(
235                    &backend.cluster_id, cluster_id,
236                    "backend cluster_id must match its bucket key"
237                );
238            }
239            let unique: HashSet<(&String, &SocketAddr)> = backends
240                .iter()
241                .map(|b| (&b.backend_id, &b.address))
242                .collect();
243            debug_assert_eq!(
244                unique.len(),
245                backends.len(),
246                "backends within a cluster must be unique on (backend_id, address)"
247            );
248        }
249
250        // TCP frontends: grouped by cluster_id. Every frontend in a bucket must
251        // carry that bucket's cluster_id, and `add_tcp_frontend` rejects exact
252        // duplicates so each bucket stays a set.
253        for (cluster_id, fronts) in &self.tcp_fronts {
254            for front in fronts {
255                debug_assert_eq!(
256                    &front.cluster_id, cluster_id,
257                    "tcp_frontend cluster_id must match its bucket key"
258                );
259            }
260            let unique: HashSet<&TcpFrontend> = fronts.iter().collect();
261            debug_assert_eq!(
262                unique.len(),
263                fronts.len(),
264                "tcp frontends within a cluster must be unique"
265            );
266        }
267
268        // Certificates: nested map keyed by (address, fingerprint). The inner
269        // map's fingerprint key is the addressing identity used by diff; we do
270        // not recompute it here (expensive), but the outer/inner structure must
271        // not hold an empty inner map silently produced outside the API — an
272        // empty bucket is a benign no-op for diff/replay, so we only assert the
273        // address-key relationship is preserved by construction (trivially true
274        // for a BTree/HashMap), leaving the costly fingerprint recompute out.
275
276        // Public accounting helpers must agree with the raw maps. These are the
277        // numbers the CLI and metrics surface; a drift here is a real bug.
278        let raw_frontends = self.http_fronts.len()
279            + self.https_fronts.len()
280            + self.count_tcp_frontends_raw()
281            + self.udp_fronts.values().map(|v| v.len()).sum::<usize>();
282        debug_assert_eq!(
283            self.count_frontends(),
284            raw_frontends,
285            "count_frontends must equal the sum of all frontend map entries"
286        );
287        let raw_backends: usize = self.backends.values().map(|v| v.len()).sum();
288        debug_assert_eq!(
289            self.count_backends(),
290            raw_backends,
291            "count_backends must equal the sum of all backend Vec lengths"
292        );
293    }
294
295    /// Raw count of TCP frontends across all clusters — debug-only helper used
296    /// by [`Self::check_invariants`] to cross-check `count_frontends`.
297    #[cfg(debug_assertions)]
298    fn count_tcp_frontends_raw(&self) -> usize {
299        self.tcp_fronts.values().map(|v| v.len()).sum()
300    }
301
302    /// Increments the count for this request type
303    fn increment_request_count(&mut self, request: &Request) {
304        if let Some(request_type) = &request.request_type {
305            let count = self
306                .request_counts
307                .entry(format_request_type(request_type).to_owned())
308                .or_insert(1);
309            *count += 1;
310        }
311    }
312
313    pub fn get_request_counts(&self) -> RequestCounts {
314        RequestCounts {
315            map: self.request_counts.clone(),
316        }
317    }
318
319    fn add_cluster(&mut self, cluster: &Cluster) -> Result<(), StateError> {
320        // Validate any inline `cluster.health_check` before mutating state so
321        // an invalid config (zero thresholds, missing leading `/`, CR/LF/NUL/C0
322        // in URI) cannot ride in via the AddCluster path. Without this, TOML
323        // reload, SaveState/LoadState, and direct API AddCluster requests
324        // bypass the SetHealthCheck-side check and let an attacker-controlled
325        // health-check URI smuggle CRLF into outbound HTTP/1.1 probes.
326        if let Some(hc) = cluster.health_check.as_ref() {
327            if let Err(reason) = crate::config::validate_health_check_config(hc) {
328                return Err(StateError::InvalidValue {
329                    field: "health_check",
330                    reason,
331                });
332            }
333        }
334        let cluster = cluster.clone();
335        // AddCluster is an upsert (replacing an existing cluster_id keeps the
336        // entry count flat), so we assert on presence/key-coherence rather than
337        // a strict +1 on len.
338        let cluster_id = cluster.cluster_id.clone();
339        self.clusters.insert(cluster_id.clone(), cluster);
340        debug_assert!(
341            self.clusters.contains_key(&cluster_id),
342            "add_cluster must leave the cluster present in the map"
343        );
344        debug_assert_eq!(
345            self.clusters.get(&cluster_id).map(|c| &c.cluster_id),
346            Some(&cluster_id),
347            "stored cluster must be keyed by its own cluster_id"
348        );
349        Ok(())
350    }
351
352    fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), StateError> {
353        let before = self.clusters.len();
354        match self.clusters.remove(cluster_id) {
355            Some(_) => {
356                debug_assert!(
357                    !self.clusters.contains_key(cluster_id),
358                    "remove_cluster must evict the cluster"
359                );
360                debug_assert_eq!(
361                    self.clusters.len(),
362                    before - 1,
363                    "remove_cluster must drop exactly one entry"
364                );
365                Ok(())
366            }
367            None => {
368                debug_assert_eq!(
369                    self.clusters.len(),
370                    before,
371                    "a failed remove_cluster must not mutate the map"
372                );
373                Err(StateError::NotFound {
374                    kind: ObjectKind::Cluster,
375                    id: cluster_id.to_owned(),
376                })
377            }
378        }
379    }
380
381    fn set_health_check(&mut self, set: &SetHealthCheck) -> Result<(), StateError> {
382        // Validate before mutating state so an invalid config (zero
383        // thresholds, missing leading `/`, CR/LF/NUL/C0 in URI) cannot
384        // round-trip through SaveState/LoadState. The worker also
385        // validates at the SetHealthCheck handler — this is the
386        // master-side mirror so off-channel TOML reload paths don't
387        // bypass the policy.
388        if let Err(reason) = crate::config::validate_health_check_config(&set.config) {
389            return Err(StateError::InvalidValue {
390                field: "health_check",
391                reason,
392            });
393        }
394        match self.clusters.get_mut(&set.cluster_id) {
395            Some(cluster) => {
396                cluster.health_check = Some(set.config.to_owned());
397                Ok(())
398            }
399            None => Err(StateError::NotFound {
400                kind: ObjectKind::Cluster,
401                id: set.cluster_id.to_owned(),
402            }),
403        }
404    }
405
406    fn remove_health_check(&mut self, cluster_id: &str) -> Result<(), StateError> {
407        match self.clusters.get_mut(cluster_id) {
408            Some(cluster) => {
409                cluster.health_check = None;
410                Ok(())
411            }
412            None => Err(StateError::NotFound {
413                kind: ObjectKind::Cluster,
414                id: cluster_id.to_owned(),
415            }),
416        }
417    }
418
419    pub fn list_health_checks(&self, cluster_id: Option<&str>) -> HealthChecksList {
420        let map = self
421            .clusters
422            .iter()
423            .filter(|(id, _)| cluster_id.is_none_or(|filter| filter == id.as_str()))
424            .filter_map(|(id, cluster)| {
425                cluster
426                    .health_check
427                    .as_ref()
428                    .map(|hc| (id.to_owned(), hc.to_owned()))
429            })
430            .collect();
431        HealthChecksList { map }
432    }
433
434    fn add_http_listener(&mut self, listener: &HttpListenerConfig) -> Result<(), StateError> {
435        let address: SocketAddr = listener.address.into();
436        let before = self.http_listeners.len();
437        match self.http_listeners.entry(address) {
438            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
439            BTreeMapEntry::Occupied(_) => {
440                debug_assert_eq!(
441                    self.http_listeners.len(),
442                    before,
443                    "a rejected duplicate add_http_listener must not mutate the map"
444                );
445                return Err(StateError::Exists {
446                    kind: ObjectKind::HttpListener,
447                    id: address.to_string(),
448                });
449            }
450        };
451        debug_assert!(
452            self.http_listeners.contains_key(&address),
453            "add_http_listener must insert the listener under its address"
454        );
455        debug_assert_eq!(
456            self.http_listeners.len(),
457            before + 1,
458            "add_http_listener inserts exactly one entry on the vacant path"
459        );
460        Ok(())
461    }
462
463    fn add_https_listener(&mut self, listener: &HttpsListenerConfig) -> Result<(), StateError> {
464        let address: SocketAddr = listener.address.into();
465        let before = self.https_listeners.len();
466        match self.https_listeners.entry(address) {
467            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
468            BTreeMapEntry::Occupied(_) => {
469                debug_assert_eq!(
470                    self.https_listeners.len(),
471                    before,
472                    "a rejected duplicate add_https_listener must not mutate the map"
473                );
474                return Err(StateError::Exists {
475                    kind: ObjectKind::HttpsListener,
476                    id: address.to_string(),
477                });
478            }
479        };
480        debug_assert!(
481            self.https_listeners.contains_key(&address),
482            "add_https_listener must insert the listener under its address"
483        );
484        debug_assert_eq!(
485            self.https_listeners.len(),
486            before + 1,
487            "add_https_listener inserts exactly one entry on the vacant path"
488        );
489        Ok(())
490    }
491
492    fn add_tcp_listener(&mut self, listener: &TcpListenerConfig) -> Result<(), StateError> {
493        let address: SocketAddr = listener.address.into();
494        let before = self.tcp_listeners.len();
495        match self.tcp_listeners.entry(address) {
496            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
497            BTreeMapEntry::Occupied(_) => {
498                debug_assert_eq!(
499                    self.tcp_listeners.len(),
500                    before,
501                    "a rejected duplicate add_tcp_listener must not mutate the map"
502                );
503                return Err(StateError::Exists {
504                    kind: ObjectKind::TcpListener,
505                    id: address.to_string(),
506                });
507            }
508        };
509        debug_assert!(
510            self.tcp_listeners.contains_key(&address),
511            "add_tcp_listener must insert the listener under its address"
512        );
513        debug_assert_eq!(
514            self.tcp_listeners.len(),
515            before + 1,
516            "add_tcp_listener inserts exactly one entry on the vacant path"
517        );
518        Ok(())
519    }
520
521    fn add_udp_listener(&mut self, listener: &UdpListenerConfig) -> Result<(), StateError> {
522        let address: SocketAddr = listener.address.into();
523        match self.udp_listeners.entry(address) {
524            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
525            BTreeMapEntry::Occupied(_) => {
526                return Err(StateError::Exists {
527                    kind: ObjectKind::UdpListener,
528                    id: address.to_string(),
529                });
530            }
531        };
532        Ok(())
533    }
534
535    fn remove_listener(&mut self, remove: &RemoveListener) -> Result<(), StateError> {
536        match ListenerType::try_from(remove.proxy).map_err(StateError::WrongFieldValue)? {
537            ListenerType::Http => self.remove_http_listener(&remove.address.into()),
538            ListenerType::Https => self.remove_https_listener(&remove.address.into()),
539            ListenerType::Tcp => self.remove_tcp_listener(&remove.address.into()),
540            ListenerType::Udp => self.remove_udp_listener(&remove.address.into()),
541        }
542    }
543
544    fn remove_http_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
545        let before = self.http_listeners.len();
546        if self.http_listeners.remove(address).is_none() {
547            debug_assert_eq!(
548                self.http_listeners.len(),
549                before,
550                "a failed remove_http_listener must not mutate the map"
551            );
552            return Err(StateError::NoChange);
553        }
554        debug_assert!(
555            !self.http_listeners.contains_key(address),
556            "remove_http_listener must evict the address"
557        );
558        debug_assert_eq!(
559            self.http_listeners.len(),
560            before - 1,
561            "remove_http_listener drops exactly one entry"
562        );
563        Ok(())
564    }
565
566    fn remove_https_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
567        let before = self.https_listeners.len();
568        if self.https_listeners.remove(address).is_none() {
569            debug_assert_eq!(
570                self.https_listeners.len(),
571                before,
572                "a failed remove_https_listener must not mutate the map"
573            );
574            return Err(StateError::NoChange);
575        }
576        debug_assert!(
577            !self.https_listeners.contains_key(address),
578            "remove_https_listener must evict the address"
579        );
580        debug_assert_eq!(
581            self.https_listeners.len(),
582            before - 1,
583            "remove_https_listener drops exactly one entry"
584        );
585        Ok(())
586    }
587
588    fn remove_tcp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
589        let before = self.tcp_listeners.len();
590        if self.tcp_listeners.remove(address).is_none() {
591            debug_assert_eq!(
592                self.tcp_listeners.len(),
593                before,
594                "a failed remove_tcp_listener must not mutate the map"
595            );
596            return Err(StateError::NoChange);
597        }
598        debug_assert!(
599            !self.tcp_listeners.contains_key(address),
600            "remove_tcp_listener must evict the address"
601        );
602        debug_assert_eq!(
603            self.tcp_listeners.len(),
604            before - 1,
605            "remove_tcp_listener drops exactly one entry"
606        );
607        Ok(())
608    }
609
610    fn remove_udp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
611        if self.udp_listeners.remove(address).is_none() {
612            return Err(StateError::NoChange);
613        }
614        Ok(())
615    }
616
617    /// Validate and apply a partial patch to an existing HTTP listener.
618    ///
619    /// Only `Some` fields in the patch are written; `None` fields preserve the
620    /// current value. Returns `StateError::NotFound` if the address is unknown,
621    /// `StateError::InvalidValue` if a flood-knob value is below the required
622    /// minimum.
623    fn update_http_listener(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
624        validate_h2_flood_knobs_http(patch)?;
625
626        let address: SocketAddr = patch.address.into();
627        let listener =
628            self.http_listeners
629                .get_mut(&address)
630                .ok_or_else(|| StateError::NotFound {
631                    kind: ObjectKind::HttpListener,
632                    id: address.to_string(),
633                })?;
634
635        // Shared session-at-accept / per-connection knobs
636        if let Some(v) = patch.public_address {
637            listener.public_address = Some(v);
638        }
639        if let Some(v) = patch.expect_proxy {
640            listener.expect_proxy = v;
641        }
642        if let Some(ref v) = patch.sticky_name {
643            listener.sticky_name = v.to_owned();
644        }
645        if let Some(v) = patch.front_timeout {
646            listener.front_timeout = v;
647        }
648        if let Some(v) = patch.back_timeout {
649            listener.back_timeout = v;
650        }
651        if let Some(v) = patch.connect_timeout {
652            listener.connect_timeout = v;
653        }
654        if let Some(v) = patch.request_timeout {
655            listener.request_timeout = v;
656        }
657        if let Some(patch_answers) = patch.http_answers.as_ref() {
658            merge_custom_http_answers(&mut listener.http_answers, patch_answers);
659        }
660        // H2 flood knobs
661        if let Some(v) = patch.h2_max_rst_stream_per_window {
662            listener.h2_max_rst_stream_per_window = Some(v);
663        }
664        if let Some(v) = patch.h2_max_ping_per_window {
665            listener.h2_max_ping_per_window = Some(v);
666        }
667        if let Some(v) = patch.h2_max_settings_per_window {
668            listener.h2_max_settings_per_window = Some(v);
669        }
670        if let Some(v) = patch.h2_max_empty_data_per_window {
671            listener.h2_max_empty_data_per_window = Some(v);
672        }
673        if let Some(v) = patch.h2_max_continuation_frames {
674            listener.h2_max_continuation_frames = Some(v);
675        }
676        if let Some(v) = patch.h2_max_glitch_count {
677            listener.h2_max_glitch_count = Some(v);
678        }
679        if let Some(v) = patch.h2_initial_connection_window {
680            listener.h2_initial_connection_window = Some(v);
681        }
682        if let Some(v) = patch.h2_max_concurrent_streams {
683            listener.h2_max_concurrent_streams = Some(v);
684        }
685        if let Some(v) = patch.h2_stream_shrink_ratio {
686            listener.h2_stream_shrink_ratio = Some(v);
687        }
688        if let Some(v) = patch.h2_max_rst_stream_lifetime {
689            listener.h2_max_rst_stream_lifetime = Some(v);
690        }
691        if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
692            listener.h2_max_rst_stream_abusive_lifetime = Some(v);
693        }
694        if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
695            listener.h2_max_rst_stream_emitted_lifetime = Some(v);
696        }
697        if let Some(v) = patch.h2_max_header_list_size {
698            listener.h2_max_header_list_size = Some(v);
699        }
700        if let Some(v) = patch.h2_max_header_table_size {
701            listener.h2_max_header_table_size = Some(v);
702        }
703        if let Some(v) = patch.h2_max_header_fields {
704            listener.h2_max_header_fields = Some(v);
705        }
706        if let Some(v) = patch.h2_stream_idle_timeout_seconds {
707            listener.h2_stream_idle_timeout_seconds = Some(v);
708        }
709        // 0 is valid for graceful_shutdown_deadline (means "wait forever")
710        if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
711            listener.h2_graceful_shutdown_deadline_seconds = Some(v);
712        }
713        if let Some(v) = patch.h2_max_window_update_stream0_per_window {
714            listener.h2_max_window_update_stream0_per_window = Some(v);
715        }
716        if let Some(ref v) = patch.sozu_id_header {
717            validate_sozu_id_header(v)?;
718            listener.sozu_id_header = Some(v.to_owned());
719        }
720        Ok(())
721    }
722
723    /// Validate and apply a partial patch to an existing HTTPS listener.
724    ///
725    /// Only `Some` fields in the patch are written; `None` fields preserve the
726    /// current value. Returns `StateError::NotFound` if the address is unknown,
727    /// `StateError::InvalidValue` if a flood-knob value is below the required
728    /// minimum or an ALPN value is unknown.
729    fn update_https_listener(
730        &mut self,
731        patch: &UpdateHttpsListenerConfig,
732    ) -> Result<(), StateError> {
733        validate_h2_flood_knobs_https(patch)?;
734
735        let address: SocketAddr = patch.address.into();
736        let listener =
737            self.https_listeners
738                .get_mut(&address)
739                .ok_or_else(|| StateError::NotFound {
740                    kind: ObjectKind::HttpsListener,
741                    id: address.to_string(),
742                })?;
743
744        // Shared session-at-accept / per-connection knobs
745        if let Some(v) = patch.public_address {
746            listener.public_address = Some(v);
747        }
748        if let Some(v) = patch.expect_proxy {
749            listener.expect_proxy = v;
750        }
751        if let Some(ref v) = patch.sticky_name {
752            listener.sticky_name = v.to_owned();
753        }
754        if let Some(v) = patch.front_timeout {
755            listener.front_timeout = v;
756        }
757        if let Some(v) = patch.back_timeout {
758            listener.back_timeout = v;
759        }
760        if let Some(v) = patch.connect_timeout {
761            listener.connect_timeout = v;
762        }
763        if let Some(v) = patch.request_timeout {
764            listener.request_timeout = v;
765        }
766        if let Some(patch_answers) = patch.http_answers.as_ref() {
767            merge_custom_http_answers(&mut listener.http_answers, patch_answers);
768        }
769        // HTTPS-only knobs
770        if let Some(ref alpn_wrapper) = patch.alpn_protocols {
771            validate_alpn_protocols(&alpn_wrapper.values)?;
772            // Empty values vec = reset to default (runtime treats empty as default)
773            listener.alpn_protocols = alpn_wrapper.values.clone();
774        }
775        if let Some(v) = patch.strict_sni_binding {
776            listener.strict_sni_binding = Some(v);
777        }
778        if let Some(v) = patch.disable_http11 {
779            listener.disable_http11 = Some(v);
780        }
781        // H2 flood knobs
782        if let Some(v) = patch.h2_max_rst_stream_per_window {
783            listener.h2_max_rst_stream_per_window = Some(v);
784        }
785        if let Some(v) = patch.h2_max_ping_per_window {
786            listener.h2_max_ping_per_window = Some(v);
787        }
788        if let Some(v) = patch.h2_max_settings_per_window {
789            listener.h2_max_settings_per_window = Some(v);
790        }
791        if let Some(v) = patch.h2_max_empty_data_per_window {
792            listener.h2_max_empty_data_per_window = Some(v);
793        }
794        if let Some(v) = patch.h2_max_continuation_frames {
795            listener.h2_max_continuation_frames = Some(v);
796        }
797        if let Some(v) = patch.h2_max_glitch_count {
798            listener.h2_max_glitch_count = Some(v);
799        }
800        if let Some(v) = patch.h2_initial_connection_window {
801            listener.h2_initial_connection_window = Some(v);
802        }
803        if let Some(v) = patch.h2_max_concurrent_streams {
804            listener.h2_max_concurrent_streams = Some(v);
805        }
806        if let Some(v) = patch.h2_stream_shrink_ratio {
807            listener.h2_stream_shrink_ratio = Some(v);
808        }
809        if let Some(v) = patch.h2_max_rst_stream_lifetime {
810            listener.h2_max_rst_stream_lifetime = Some(v);
811        }
812        if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
813            listener.h2_max_rst_stream_abusive_lifetime = Some(v);
814        }
815        if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
816            listener.h2_max_rst_stream_emitted_lifetime = Some(v);
817        }
818        if let Some(v) = patch.h2_max_header_list_size {
819            listener.h2_max_header_list_size = Some(v);
820        }
821        if let Some(v) = patch.h2_max_header_table_size {
822            listener.h2_max_header_table_size = Some(v);
823        }
824        if let Some(v) = patch.h2_max_header_fields {
825            listener.h2_max_header_fields = Some(v);
826        }
827        if let Some(v) = patch.h2_stream_idle_timeout_seconds {
828            listener.h2_stream_idle_timeout_seconds = Some(v);
829        }
830        // 0 is valid for graceful_shutdown_deadline (means "wait forever")
831        if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
832            listener.h2_graceful_shutdown_deadline_seconds = Some(v);
833        }
834        if let Some(v) = patch.h2_max_window_update_stream0_per_window {
835            listener.h2_max_window_update_stream0_per_window = Some(v);
836        }
837        if let Some(ref v) = patch.sozu_id_header {
838            validate_sozu_id_header(v)?;
839            listener.sozu_id_header = Some(v.to_owned());
840        }
841        Ok(())
842    }
843
844    /// Validate and apply a partial patch to an existing TCP listener.
845    ///
846    /// Only `Some` fields in the patch are written; `None` fields preserve the
847    /// current value. Returns `StateError::NotFound` if the address is unknown.
848    fn update_tcp_listener(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), StateError> {
849        let address: SocketAddr = patch.address.into();
850        let listener =
851            self.tcp_listeners
852                .get_mut(&address)
853                .ok_or_else(|| StateError::NotFound {
854                    kind: ObjectKind::TcpListener,
855                    id: address.to_string(),
856                })?;
857
858        if let Some(v) = patch.public_address {
859            listener.public_address = Some(v);
860        }
861        if let Some(v) = patch.expect_proxy {
862            listener.expect_proxy = v;
863        }
864        if let Some(v) = patch.front_timeout {
865            listener.front_timeout = v;
866        }
867        if let Some(v) = patch.back_timeout {
868            listener.back_timeout = v;
869        }
870        if let Some(v) = patch.connect_timeout {
871            listener.connect_timeout = v;
872        }
873        Ok(())
874    }
875
876    /// Validate and apply a partial patch to an existing UDP listener.
877    ///
878    /// Only `Some` fields in the patch are written; `None` fields preserve the
879    /// current value. Returns `StateError::NotFound` if the address is unknown.
880    fn update_udp_listener(&mut self, patch: &UpdateUdpListenerConfig) -> Result<(), StateError> {
881        let address: SocketAddr = patch.address.into();
882        let listener =
883            self.udp_listeners
884                .get_mut(&address)
885                .ok_or_else(|| StateError::NotFound {
886                    kind: ObjectKind::UdpListener,
887                    id: address.to_string(),
888                })?;
889
890        if let Some(v) = patch.public_address {
891            listener.public_address = Some(v);
892        }
893        if let Some(v) = patch.front_timeout {
894            listener.front_timeout = v;
895        }
896        if let Some(v) = patch.back_timeout {
897            listener.back_timeout = v;
898        }
899        if let Some(v) = patch.max_rx_datagram_size {
900            listener.max_rx_datagram_size = v;
901        }
902        if let Some(v) = patch.max_flows {
903            listener.max_flows = v;
904        }
905        Ok(())
906    }
907
908    fn activate_listener(&mut self, activate: &ActivateListener) -> Result<(), StateError> {
909        match ListenerType::try_from(activate.proxy).map_err(StateError::WrongFieldValue)? {
910            ListenerType::Http => self
911                .http_listeners
912                .get_mut(&activate.address.into())
913                .map(|listener| listener.active = true)
914                .ok_or(StateError::NotFound {
915                    kind: ObjectKind::HttpListener,
916                    id: activate.address.to_string(),
917                }),
918            ListenerType::Https => self
919                .https_listeners
920                .get_mut(&activate.address.into())
921                .map(|listener| listener.active = true)
922                .ok_or(StateError::NotFound {
923                    kind: ObjectKind::HttpsListener,
924                    id: activate.address.to_string(),
925                }),
926            ListenerType::Tcp => self
927                .tcp_listeners
928                .get_mut(&activate.address.into())
929                .map(|listener| listener.active = true)
930                .ok_or(StateError::NotFound {
931                    kind: ObjectKind::TcpListener,
932                    id: activate.address.to_string(),
933                }),
934            ListenerType::Udp => self
935                .udp_listeners
936                .get_mut(&activate.address.into())
937                .map(|listener| listener.active = true)
938                .ok_or(StateError::NotFound {
939                    kind: ObjectKind::UdpListener,
940                    id: activate.address.to_string(),
941                }),
942        }
943    }
944
945    fn deactivate_listener(&mut self, deactivate: &DeactivateListener) -> Result<(), StateError> {
946        match ListenerType::try_from(deactivate.proxy).map_err(StateError::WrongFieldValue)? {
947            ListenerType::Http => self
948                .http_listeners
949                .get_mut(&deactivate.address.into())
950                .map(|listener| listener.active = false)
951                .ok_or(StateError::NotFound {
952                    kind: ObjectKind::HttpListener,
953                    id: deactivate.address.to_string(),
954                }),
955            ListenerType::Https => self
956                .https_listeners
957                .get_mut(&deactivate.address.into())
958                .map(|listener| listener.active = false)
959                .ok_or(StateError::NotFound {
960                    kind: ObjectKind::HttpsListener,
961                    id: deactivate.address.to_string(),
962                }),
963            ListenerType::Tcp => self
964                .tcp_listeners
965                .get_mut(&deactivate.address.into())
966                .map(|listener| listener.active = false)
967                .ok_or(StateError::NotFound {
968                    kind: ObjectKind::TcpListener,
969                    id: deactivate.address.to_string(),
970                }),
971            ListenerType::Udp => self
972                .udp_listeners
973                .get_mut(&deactivate.address.into())
974                .map(|listener| listener.active = false)
975                .ok_or(StateError::NotFound {
976                    kind: ObjectKind::UdpListener,
977                    id: deactivate.address.to_string(),
978                }),
979        }
980    }
981
982    fn add_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
983        let front_as_key = front.to_string();
984        let before = self.http_fronts.len();
985
986        match self.http_fronts.entry(front.to_string()) {
987            BTreeMapEntry::Vacant(e) => {
988                e.insert(front.clone().to_frontend().map_err(|into_error| {
989                    StateError::FrontendConversion {
990                        frontend: front_as_key,
991                        error: into_error.to_string(),
992                    }
993                })?)
994            }
995            BTreeMapEntry::Occupied(_) => {
996                debug_assert_eq!(
997                    self.http_fronts.len(),
998                    before,
999                    "a rejected duplicate add_http_frontend must not mutate the map"
1000                );
1001                return Err(StateError::Exists {
1002                    kind: ObjectKind::HttpFrontend,
1003                    id: front.to_string(),
1004                });
1005            }
1006        };
1007        // On the conversion-error path the `?` already returned, so reaching
1008        // here means exactly one entry was inserted under the route key.
1009        debug_assert!(
1010            self.http_fronts.contains_key(&front.to_string()),
1011            "add_http_frontend must insert the route key on success"
1012        );
1013        debug_assert_eq!(
1014            self.http_fronts.len(),
1015            before + 1,
1016            "add_http_frontend inserts exactly one entry on success"
1017        );
1018        Ok(())
1019    }
1020
1021    fn add_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
1022        let front_as_key = front.to_string();
1023        let before = self.https_fronts.len();
1024
1025        match self.https_fronts.entry(front.to_string()) {
1026            BTreeMapEntry::Vacant(e) => {
1027                e.insert(front.clone().to_frontend().map_err(|into_error| {
1028                    StateError::FrontendConversion {
1029                        frontend: front_as_key,
1030                        error: into_error.to_string(),
1031                    }
1032                })?)
1033            }
1034            BTreeMapEntry::Occupied(_) => {
1035                debug_assert_eq!(
1036                    self.https_fronts.len(),
1037                    before,
1038                    "a rejected duplicate add_https_frontend must not mutate the map"
1039                );
1040                return Err(StateError::Exists {
1041                    kind: ObjectKind::HttpsFrontend,
1042                    id: front.to_string(),
1043                });
1044            }
1045        };
1046        debug_assert!(
1047            self.https_fronts.contains_key(&front.to_string()),
1048            "add_https_frontend must insert the route key on success"
1049        );
1050        debug_assert_eq!(
1051            self.https_fronts.len(),
1052            before + 1,
1053            "add_https_frontend inserts exactly one entry on success"
1054        );
1055        Ok(())
1056    }
1057
1058    fn remove_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
1059        let key = front.to_string();
1060        let before = self.http_fronts.len();
1061        self.http_fronts.remove(&key).ok_or(StateError::NotFound {
1062            kind: ObjectKind::HttpFrontend,
1063            id: front.to_string(),
1064        })?;
1065        debug_assert!(
1066            !self.http_fronts.contains_key(&key),
1067            "remove_http_frontend must evict the route key"
1068        );
1069        debug_assert_eq!(
1070            self.http_fronts.len(),
1071            before - 1,
1072            "remove_http_frontend drops exactly one entry"
1073        );
1074        Ok(())
1075    }
1076
1077    fn remove_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
1078        let key = front.to_string();
1079        let before = self.https_fronts.len();
1080        self.https_fronts.remove(&key).ok_or(StateError::NotFound {
1081            kind: ObjectKind::HttpsFrontend,
1082            id: front.to_string(),
1083        })?;
1084        debug_assert!(
1085            !self.https_fronts.contains_key(&key),
1086            "remove_https_frontend must evict the route key"
1087        );
1088        debug_assert_eq!(
1089            self.https_fronts.len(),
1090            before - 1,
1091            "remove_https_frontend drops exactly one entry"
1092        );
1093        Ok(())
1094    }
1095
1096    fn add_certificate(&mut self, add: &AddCertificate) -> Result<(), StateError> {
1097        let fingerprint = add
1098            .certificate
1099            .fingerprint()
1100            .map_err(StateError::AddCertificate)?;
1101
1102        let entry = self.certificates.entry(add.address.into()).or_default();
1103
1104        let mut add = add.clone();
1105        add.certificate
1106            .apply_overriding_names()
1107            .map_err(StateError::AddCertificate)?;
1108
1109        if entry.contains_key(&fingerprint) {
1110            info!(
1111                "Skip loading of certificate '{}' for domain '{}' on listener '{}', the certificate is already present.",
1112                fingerprint,
1113                add.certificate.names.join(", "),
1114                add.address
1115            );
1116            return Ok(());
1117        }
1118
1119        let before = entry.len();
1120        entry.insert(fingerprint.clone(), add.certificate);
1121        debug_assert!(
1122            entry.contains_key(&fingerprint),
1123            "add_certificate must insert the fingerprint under its address"
1124        );
1125        debug_assert_eq!(
1126            entry.len(),
1127            before + 1,
1128            "add_certificate inserts exactly one fingerprint on the new path"
1129        );
1130        Ok(())
1131    }
1132
1133    fn remove_certificate(&mut self, remove: &RemoveCertificate) -> Result<(), StateError> {
1134        let fingerprint = Fingerprint(
1135            hex::decode(&remove.fingerprint)
1136                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
1137        );
1138
1139        if let Some(index) = self.certificates.get_mut(&remove.address.into()) {
1140            index.remove(&fingerprint);
1141            debug_assert!(
1142                !index.contains_key(&fingerprint),
1143                "remove_certificate must evict the fingerprint when the address is known"
1144            );
1145        }
1146
1147        Ok(())
1148    }
1149
1150    /// - Remove old certificate from certificates, using the old fingerprint
1151    /// - calculate the new fingerprint
1152    /// - insert the new certificate with the new fingerprint as key
1153    /// - check that the new entry is present in the certificates hashmap
1154    fn replace_certificate(&mut self, replace: &ReplaceCertificate) -> Result<(), StateError> {
1155        let replace_address = replace.address.into();
1156        let old_fingerprint = Fingerprint(
1157            hex::decode(&replace.old_fingerprint)
1158                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
1159        );
1160
1161        self.certificates
1162            .get_mut(&replace_address)
1163            .ok_or(StateError::NotFound {
1164                kind: ObjectKind::Certificate,
1165                id: replace.address.to_string(),
1166            })?
1167            .remove(&old_fingerprint);
1168
1169        let new_fingerprint = Fingerprint(
1170            calculate_fingerprint(replace.new_certificate.certificate.as_bytes()).map_err(
1171                |fingerprint_err| StateError::ReplaceCertificate(fingerprint_err.to_string()),
1172            )?,
1173        );
1174
1175        self.certificates
1176            .get_mut(&replace_address)
1177            .map(|certs| certs.insert(new_fingerprint.clone(), replace.new_certificate.clone()));
1178
1179        if !self
1180            .certificates
1181            .get(&replace_address)
1182            .ok_or(StateError::ReplaceCertificate(
1183                "Unlikely error. This entry in the certificate hashmap should be present"
1184                    .to_string(),
1185            ))?
1186            .contains_key(&new_fingerprint)
1187        {
1188            return Err(StateError::ReplaceCertificate(format!(
1189                "Failed to insert the new certificate for address {}",
1190                replace.address
1191            )));
1192        }
1193        // Postcondition: the new fingerprint is keyed under the address, and
1194        // (unless old and new collide, e.g. a self-replace) the old one is gone.
1195        debug_assert!(
1196            self.certificates
1197                .get(&replace_address)
1198                .is_some_and(|certs| certs.contains_key(&new_fingerprint)),
1199            "replace_certificate must leave the new fingerprint present"
1200        );
1201        debug_assert!(
1202            new_fingerprint == old_fingerprint
1203                || self
1204                    .certificates
1205                    .get(&replace_address)
1206                    .is_none_or(|certs| !certs.contains_key(&old_fingerprint)),
1207            "replace_certificate must evict the old fingerprint unless it equals the new one"
1208        );
1209        Ok(())
1210    }
1211
1212    fn add_tcp_frontend(&mut self, front: &RequestTcpFrontend) -> Result<(), StateError> {
1213        let tcp_frontends = self.tcp_fronts.entry(front.cluster_id.clone()).or_default();
1214
1215        let tcp_frontend = TcpFrontend {
1216            cluster_id: front.cluster_id.clone(),
1217            address: front.address.into(),
1218            tags: front.tags.clone(),
1219        };
1220        let before = tcp_frontends.len();
1221        if tcp_frontends.contains(&tcp_frontend) {
1222            debug_assert_eq!(
1223                tcp_frontends.len(),
1224                before,
1225                "a rejected duplicate add_tcp_frontend must not grow the bucket"
1226            );
1227            return Err(StateError::Exists {
1228                kind: ObjectKind::TcpFrontend,
1229                id: format!("{tcp_frontend:?}"),
1230            });
1231        }
1232
1233        debug_assert_eq!(
1234            tcp_frontend.cluster_id, front.cluster_id,
1235            "the built frontend must carry its bucket's cluster_id"
1236        );
1237        tcp_frontends.push(tcp_frontend);
1238        debug_assert_eq!(
1239            tcp_frontends.len(),
1240            before + 1,
1241            "add_tcp_frontend appends exactly one entry"
1242        );
1243        Ok(())
1244    }
1245
1246    fn remove_tcp_frontend(
1247        &mut self,
1248        front_to_remove: &RequestTcpFrontend,
1249    ) -> Result<(), StateError> {
1250        let tcp_frontends =
1251            self.tcp_fronts
1252                .get_mut(&front_to_remove.cluster_id)
1253                .ok_or(StateError::NotFound {
1254                    kind: ObjectKind::TcpFrontend,
1255                    id: format!("{front_to_remove:?}"),
1256                })?;
1257
1258        let len = tcp_frontends.len();
1259        let remove_address: SocketAddr = front_to_remove.address.into();
1260        tcp_frontends.retain(|front| front.address != remove_address);
1261        let after = tcp_frontends.len();
1262        if after == len {
1263            return Err(StateError::NoChange);
1264        }
1265        // `retain` may drop more than one entry only if duplicates on the same
1266        // address ever existed; `add_tcp_frontend` forbids that, so a
1267        // successful removal must drop exactly one and leave none matching.
1268        debug_assert_eq!(
1269            after,
1270            len - 1,
1271            "remove_tcp_frontend drops exactly one entry"
1272        );
1273        debug_assert!(
1274            !tcp_frontends.iter().any(|f| f.address == remove_address),
1275            "remove_tcp_frontend must leave no frontend at the removed address"
1276        );
1277        Ok(())
1278    }
1279
1280    fn add_udp_frontend(&mut self, front: &RequestUdpFrontend) -> Result<(), StateError> {
1281        let udp_frontends = self.udp_fronts.entry(front.cluster_id.clone()).or_default();
1282
1283        let udp_frontend = UdpFrontend {
1284            cluster_id: front.cluster_id.clone(),
1285            address: front.address.into(),
1286            tags: front.tags.clone(),
1287        };
1288        if udp_frontends.contains(&udp_frontend) {
1289            return Err(StateError::Exists {
1290                kind: ObjectKind::UdpFrontend,
1291                id: format!("{udp_frontend:?}"),
1292            });
1293        }
1294
1295        udp_frontends.push(udp_frontend);
1296        Ok(())
1297    }
1298
1299    fn remove_udp_frontend(
1300        &mut self,
1301        front_to_remove: &RequestUdpFrontend,
1302    ) -> Result<(), StateError> {
1303        let udp_frontends =
1304            self.udp_fronts
1305                .get_mut(&front_to_remove.cluster_id)
1306                .ok_or(StateError::NotFound {
1307                    kind: ObjectKind::UdpFrontend,
1308                    id: format!("{front_to_remove:?}"),
1309                })?;
1310
1311        let len = udp_frontends.len();
1312        udp_frontends.retain(|front| front.address != front_to_remove.address.into());
1313        if udp_frontends.len() == len {
1314            return Err(StateError::NoChange);
1315        }
1316        Ok(())
1317    }
1318
1319    fn add_backend(&mut self, add_backend: &AddBackend) -> Result<(), StateError> {
1320        let backend = Backend {
1321            address: add_backend.address.into(),
1322            cluster_id: add_backend.cluster_id.clone(),
1323            backend_id: add_backend.backend_id.clone(),
1324            sticky_id: add_backend.sticky_id.clone(),
1325            load_balancing_parameters: add_backend.load_balancing_parameters,
1326            backup: add_backend.backup,
1327        };
1328        let backends = self.backends.entry(backend.cluster_id.clone()).or_default();
1329        let backend_id = backend.backend_id.clone();
1330        let backend_address = backend.address;
1331        let before = backends.len();
1332
1333        // we might be modifying the sticky id or load balancing parameters:
1334        // the retain drops at most one prior copy (the map stays deduplicated
1335        // on (backend_id, address)), then we re-push the new version. So the
1336        // net length grows by exactly one iff this was a brand-new backend.
1337        let was_present = backends
1338            .iter()
1339            .any(|b| b.backend_id == backend_id && b.address == backend_address);
1340        backends.retain(|b| b.backend_id != backend.backend_id || b.address != backend.address);
1341        debug_assert_eq!(
1342            backends.len(),
1343            before - was_present as usize,
1344            "the upsert retain must drop exactly the prior copy iff it existed"
1345        );
1346        backends.push(backend);
1347        backends.sort();
1348
1349        debug_assert_eq!(
1350            backends.len(),
1351            before + (!was_present) as usize,
1352            "add_backend grows the bucket by one iff the backend was new"
1353        );
1354        debug_assert_eq!(
1355            backends
1356                .iter()
1357                .filter(|b| b.backend_id == backend_id && b.address == backend_address)
1358                .count(),
1359            1,
1360            "exactly one copy of the upserted backend must remain"
1361        );
1362        Ok(())
1363    }
1364
1365    fn remove_backend(&mut self, backend: &RemoveBackend) -> Result<(), StateError> {
1366        let backend_list =
1367            self.backends
1368                .get_mut(&backend.cluster_id)
1369                .ok_or(StateError::NotFound {
1370                    kind: ObjectKind::Backend,
1371                    id: backend.backend_id.to_owned(),
1372                })?;
1373
1374        let len = backend_list.len();
1375        let remove_address: SocketAddr = backend.address.into();
1376        backend_list.retain(|b| b.backend_id != backend.backend_id || b.address != remove_address);
1377        backend_list.sort();
1378        let after = backend_list.len();
1379        if after == len {
1380            return Err(StateError::NoChange);
1381        }
1382        // The list is deduplicated on (backend_id, address), so a matching
1383        // removal drops exactly one entry and leaves nothing matching.
1384        debug_assert_eq!(after, len - 1, "remove_backend drops exactly one entry");
1385        debug_assert!(
1386            !backend_list
1387                .iter()
1388                .any(|b| b.backend_id == backend.backend_id && b.address == remove_address),
1389            "remove_backend must leave no backend matching (backend_id, address)"
1390        );
1391        Ok(())
1392    }
1393
1394    /// creates all requests needed to bootstrap the state
1395    fn generate_requests(&self) -> Vec<Request> {
1396        let mut v: Vec<Request> = Vec::new();
1397
1398        for listener in self.http_listeners.values() {
1399            v.push(RequestType::AddHttpListener(listener.clone()).into());
1400            if listener.active {
1401                v.push(
1402                    RequestType::ActivateListener(ActivateListener {
1403                        address: listener.address,
1404                        proxy: ListenerType::Http.into(),
1405                        from_scm: false,
1406                    })
1407                    .into(),
1408                );
1409            }
1410        }
1411
1412        for listener in self.https_listeners.values() {
1413            v.push(RequestType::AddHttpsListener(listener.clone()).into());
1414            if listener.active {
1415                v.push(
1416                    RequestType::ActivateListener(ActivateListener {
1417                        address: listener.address,
1418                        proxy: ListenerType::Https.into(),
1419                        from_scm: false,
1420                    })
1421                    .into(),
1422                );
1423            }
1424        }
1425
1426        for listener in self.tcp_listeners.values() {
1427            v.push(RequestType::AddTcpListener(*listener).into());
1428            if listener.active {
1429                v.push(
1430                    RequestType::ActivateListener(ActivateListener {
1431                        address: listener.address,
1432                        proxy: ListenerType::Tcp.into(),
1433                        from_scm: false,
1434                    })
1435                    .into(),
1436                );
1437            }
1438        }
1439
1440        for listener in self.udp_listeners.values() {
1441            v.push(RequestType::AddUdpListener(*listener).into());
1442            if listener.active {
1443                v.push(
1444                    RequestType::ActivateListener(ActivateListener {
1445                        address: listener.address,
1446                        proxy: ListenerType::Udp.into(),
1447                        from_scm: false,
1448                    })
1449                    .into(),
1450                );
1451            }
1452        }
1453
1454        for cluster in self.clusters.values() {
1455            v.push(RequestType::AddCluster(cluster.clone()).into());
1456        }
1457
1458        for front in self.http_fronts.values() {
1459            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
1460        }
1461
1462        for (front, certs) in self.certificates.iter() {
1463            for certificate_and_key in certs.values() {
1464                v.push(
1465                    RequestType::AddCertificate(AddCertificate {
1466                        address: SocketAddress::from(*front),
1467                        certificate: certificate_and_key.clone(),
1468                        expired_at: None,
1469                    })
1470                    .into(),
1471                );
1472            }
1473        }
1474
1475        for front in self.https_fronts.values() {
1476            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
1477        }
1478
1479        for front_list in self.tcp_fronts.values() {
1480            for front in front_list {
1481                v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
1482            }
1483        }
1484
1485        for front_list in self.udp_fronts.values() {
1486            for front in front_list {
1487                v.push(RequestType::AddUdpFrontend(front.clone().into()).into());
1488            }
1489        }
1490
1491        for backend_list in self.backends.values() {
1492            for backend in backend_list {
1493                v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
1494            }
1495        }
1496
1497        // Bootstrap round-trip: replaying `generate_requests` into a fresh,
1498        // empty `ConfigState` must reconstruct `self`'s maps exactly. This is
1499        // the property SaveState/LoadState and worker bootstrap depend on. We
1500        // compare the business maps only (not `request_counts`, which is
1501        // bookkeeping mutated by `dispatch`).
1502        #[cfg(debug_assertions)]
1503        {
1504            let mut replayed = ConfigState::new();
1505            for request in &v {
1506                debug_assert!(
1507                    replayed.dispatch(request).is_ok(),
1508                    "every request from generate_requests must replay cleanly"
1509                );
1510            }
1511            debug_assert!(
1512                replayed.clusters == self.clusters
1513                    && replayed.backends == self.backends
1514                    && replayed.http_listeners == self.http_listeners
1515                    && replayed.https_listeners == self.https_listeners
1516                    && replayed.tcp_listeners == self.tcp_listeners
1517                    && replayed.http_fronts == self.http_fronts
1518                    && replayed.https_fronts == self.https_fronts
1519                    && replayed.tcp_fronts == self.tcp_fronts
1520                    && replayed.certificates == self.certificates,
1521                "replaying generate_requests into a fresh state must reproduce self"
1522            );
1523        }
1524
1525        v
1526    }
1527
1528    pub fn generate_activate_requests(&self) -> Vec<Request> {
1529        let mut v: Vec<Request> = Vec::new();
1530        for front in self
1531            .http_listeners
1532            .iter()
1533            .filter(|(_, listener)| listener.active)
1534            .map(|(k, _)| k)
1535        {
1536            v.push(
1537                RequestType::ActivateListener(ActivateListener {
1538                    address: SocketAddress::from(*front),
1539                    proxy: ListenerType::Http.into(),
1540                    from_scm: false,
1541                })
1542                .into(),
1543            );
1544        }
1545
1546        for front in self
1547            .https_listeners
1548            .iter()
1549            .filter(|(_, listener)| listener.active)
1550            .map(|(k, _)| k)
1551        {
1552            v.push(
1553                RequestType::ActivateListener(ActivateListener {
1554                    address: SocketAddress::from(*front),
1555                    proxy: ListenerType::Https.into(),
1556                    from_scm: false,
1557                })
1558                .into(),
1559            );
1560        }
1561        for front in self
1562            .tcp_listeners
1563            .iter()
1564            .filter(|(_, listener)| listener.active)
1565            .map(|(k, _)| k)
1566        {
1567            v.push(
1568                RequestType::ActivateListener(ActivateListener {
1569                    address: SocketAddress::from(*front),
1570                    proxy: ListenerType::Tcp.into(),
1571                    from_scm: false,
1572                })
1573                .into(),
1574            );
1575        }
1576        for front in self
1577            .udp_listeners
1578            .iter()
1579            .filter(|(_, listener)| listener.active)
1580            .map(|(k, _)| k)
1581        {
1582            v.push(
1583                RequestType::ActivateListener(ActivateListener {
1584                    address: SocketAddress::from(*front),
1585                    proxy: ListenerType::Udp.into(),
1586                    from_scm: false,
1587                })
1588                .into(),
1589            );
1590        }
1591
1592        // Symmetry with the active-listener census: exactly one ActivateListener
1593        // is emitted per active listener across the four maps, no more, no less.
1594        #[cfg(debug_assertions)]
1595        {
1596            let active_listeners = self.http_listeners.values().filter(|l| l.active).count()
1597                + self.https_listeners.values().filter(|l| l.active).count()
1598                + self.tcp_listeners.values().filter(|l| l.active).count()
1599                + self.udp_listeners.values().filter(|l| l.active).count();
1600            debug_assert_eq!(
1601                v.len(),
1602                active_listeners,
1603                "generate_activate_requests emits one request per active listener"
1604            );
1605            debug_assert!(
1606                v.iter()
1607                    .all(|r| matches!(r.request_type, Some(RequestType::ActivateListener(_)))),
1608                "generate_activate_requests must emit only ActivateListener requests"
1609            );
1610        }
1611
1612        v
1613    }
1614
1615    pub fn diff(&self, other: &ConfigState) -> Vec<Request> {
1616        //pub tcp_listeners:   HashMap<SocketAddr, (TcpListener, bool)>,
1617        let my_tcp_listeners: HashSet<&SocketAddr> = self.tcp_listeners.keys().collect();
1618        let their_tcp_listeners: HashSet<&SocketAddr> = other.tcp_listeners.keys().collect();
1619        let removed_tcp_listeners = my_tcp_listeners.difference(&their_tcp_listeners);
1620        let added_tcp_listeners = their_tcp_listeners.difference(&my_tcp_listeners);
1621
1622        let my_udp_listeners: HashSet<&SocketAddr> = self.udp_listeners.keys().collect();
1623        let their_udp_listeners: HashSet<&SocketAddr> = other.udp_listeners.keys().collect();
1624        let removed_udp_listeners = my_udp_listeners.difference(&their_udp_listeners);
1625        let added_udp_listeners = their_udp_listeners.difference(&my_udp_listeners);
1626
1627        let my_http_listeners: HashSet<&SocketAddr> = self.http_listeners.keys().collect();
1628        let their_http_listeners: HashSet<&SocketAddr> = other.http_listeners.keys().collect();
1629        let removed_http_listeners = my_http_listeners.difference(&their_http_listeners);
1630        let added_http_listeners = their_http_listeners.difference(&my_http_listeners);
1631
1632        let my_https_listeners: HashSet<&SocketAddr> = self.https_listeners.keys().collect();
1633        let their_https_listeners: HashSet<&SocketAddr> = other.https_listeners.keys().collect();
1634        let removed_https_listeners = my_https_listeners.difference(&their_https_listeners);
1635        let added_https_listeners = their_https_listeners.difference(&my_https_listeners);
1636
1637        let mut v: Vec<Request> = vec![];
1638
1639        for address in removed_tcp_listeners {
1640            if self.tcp_listeners[*address].active {
1641                v.push(
1642                    RequestType::DeactivateListener(DeactivateListener {
1643                        address: SocketAddress::from(**address),
1644                        proxy: ListenerType::Tcp.into(),
1645                        to_scm: false,
1646                    })
1647                    .into(),
1648                );
1649            }
1650
1651            v.push(
1652                RequestType::RemoveListener(RemoveListener {
1653                    address: SocketAddress::from(**address),
1654                    proxy: ListenerType::Tcp.into(),
1655                })
1656                .into(),
1657            );
1658        }
1659
1660        for address in added_tcp_listeners.clone() {
1661            v.push(RequestType::AddTcpListener(other.tcp_listeners[*address]).into());
1662
1663            if other.tcp_listeners[*address].active {
1664                v.push(
1665                    RequestType::ActivateListener(ActivateListener {
1666                        address: SocketAddress::from(**address),
1667                        proxy: ListenerType::Tcp.into(),
1668                        from_scm: false,
1669                    })
1670                    .into(),
1671                );
1672            }
1673        }
1674
1675        for address in removed_udp_listeners {
1676            if self.udp_listeners[*address].active {
1677                v.push(
1678                    RequestType::DeactivateListener(DeactivateListener {
1679                        address: SocketAddress::from(**address),
1680                        proxy: ListenerType::Udp.into(),
1681                        to_scm: false,
1682                    })
1683                    .into(),
1684                );
1685            }
1686
1687            v.push(
1688                RequestType::RemoveListener(RemoveListener {
1689                    address: SocketAddress::from(**address),
1690                    proxy: ListenerType::Udp.into(),
1691                })
1692                .into(),
1693            );
1694        }
1695
1696        for address in added_udp_listeners.clone() {
1697            v.push(RequestType::AddUdpListener(other.udp_listeners[*address]).into());
1698
1699            if other.udp_listeners[*address].active {
1700                v.push(
1701                    RequestType::ActivateListener(ActivateListener {
1702                        address: SocketAddress::from(**address),
1703                        proxy: ListenerType::Udp.into(),
1704                        from_scm: false,
1705                    })
1706                    .into(),
1707                );
1708            }
1709        }
1710
1711        for address in removed_http_listeners {
1712            if self.http_listeners[*address].active {
1713                v.push(
1714                    RequestType::DeactivateListener(DeactivateListener {
1715                        address: SocketAddress::from(**address),
1716                        proxy: ListenerType::Http.into(),
1717                        to_scm: false,
1718                    })
1719                    .into(),
1720                );
1721            }
1722
1723            v.push(
1724                RequestType::RemoveListener(RemoveListener {
1725                    address: SocketAddress::from(**address),
1726                    proxy: ListenerType::Http.into(),
1727                })
1728                .into(),
1729            );
1730        }
1731
1732        for address in added_http_listeners.clone() {
1733            v.push(RequestType::AddHttpListener(other.http_listeners[*address].clone()).into());
1734
1735            if other.http_listeners[*address].active {
1736                v.push(
1737                    RequestType::ActivateListener(ActivateListener {
1738                        address: SocketAddress::from(**address),
1739                        proxy: ListenerType::Http.into(),
1740                        from_scm: false,
1741                    })
1742                    .into(),
1743                );
1744            }
1745        }
1746
1747        for address in removed_https_listeners {
1748            if self.https_listeners[*address].active {
1749                v.push(
1750                    RequestType::DeactivateListener(DeactivateListener {
1751                        address: SocketAddress::from(**address),
1752                        proxy: ListenerType::Https.into(),
1753                        to_scm: false,
1754                    })
1755                    .into(),
1756                );
1757            }
1758
1759            v.push(
1760                RequestType::RemoveListener(RemoveListener {
1761                    address: SocketAddress::from(**address),
1762                    proxy: ListenerType::Https.into(),
1763                })
1764                .into(),
1765            );
1766        }
1767
1768        for address in added_https_listeners.clone() {
1769            v.push(RequestType::AddHttpsListener(other.https_listeners[*address].clone()).into());
1770
1771            if other.https_listeners[*address].active {
1772                v.push(
1773                    RequestType::ActivateListener(ActivateListener {
1774                        address: SocketAddress::from(**address),
1775                        proxy: ListenerType::Https.into(),
1776                        from_scm: false,
1777                    })
1778                    .into(),
1779                );
1780            }
1781        }
1782
1783        for addr in my_tcp_listeners.intersection(&their_tcp_listeners) {
1784            let my_listener = &self.tcp_listeners[*addr];
1785            let their_listener = &other.tcp_listeners[*addr];
1786
1787            if my_listener != their_listener {
1788                v.push(
1789                    RequestType::RemoveListener(RemoveListener {
1790                        address: SocketAddress::from(**addr),
1791                        proxy: ListenerType::Tcp.into(),
1792                    })
1793                    .into(),
1794                );
1795                // any added listener should be unactive
1796                let mut listener_to_add = *their_listener;
1797                listener_to_add.active = false;
1798                v.push(RequestType::AddTcpListener(listener_to_add).into());
1799
1800                // The Remove + Add(active=false) above wipes the listener's
1801                // active state. Re-emit an ActivateListener whenever the target
1802                // state keeps it active, otherwise a config change on a still
1803                // active listener would silently deactivate it on replay. This
1804                // subsumes the newly-active (`!my.active && their.active`) case:
1805                // a differing `active` flag always makes the listeners unequal.
1806                if their_listener.active {
1807                    v.push(
1808                        RequestType::ActivateListener(ActivateListener {
1809                            address: SocketAddress::from(**addr),
1810                            proxy: ListenerType::Tcp.into(),
1811                            from_scm: false,
1812                        })
1813                        .into(),
1814                    );
1815                }
1816            }
1817
1818            if my_listener.active && !their_listener.active {
1819                v.push(
1820                    RequestType::DeactivateListener(DeactivateListener {
1821                        address: SocketAddress::from(**addr),
1822                        proxy: ListenerType::Tcp.into(),
1823                        to_scm: false,
1824                    })
1825                    .into(),
1826                );
1827            }
1828        }
1829
1830        for addr in my_udp_listeners.intersection(&their_udp_listeners) {
1831            let my_listener = &self.udp_listeners[*addr];
1832            let their_listener = &other.udp_listeners[*addr];
1833
1834            if my_listener != their_listener {
1835                v.push(
1836                    RequestType::RemoveListener(RemoveListener {
1837                        address: SocketAddress::from(**addr),
1838                        proxy: ListenerType::Udp.into(),
1839                    })
1840                    .into(),
1841                );
1842                // any added listener should be unactive
1843                let mut listener_to_add = *their_listener;
1844                listener_to_add.active = false;
1845                v.push(RequestType::AddUdpListener(listener_to_add).into());
1846
1847                // The Remove + Add(active=false) above wipes the listener's
1848                // active state. Re-emit an ActivateListener whenever the target
1849                // state keeps it active, otherwise a config change on a still
1850                // active listener would silently deactivate it on replay. This
1851                // subsumes the newly-active (`!my.active && their.active`) case:
1852                // a differing `active` flag always makes the listeners unequal.
1853                if their_listener.active {
1854                    v.push(
1855                        RequestType::ActivateListener(ActivateListener {
1856                            address: SocketAddress::from(**addr),
1857                            proxy: ListenerType::Udp.into(),
1858                            from_scm: false,
1859                        })
1860                        .into(),
1861                    );
1862                }
1863            }
1864
1865            if my_listener.active && !their_listener.active {
1866                v.push(
1867                    RequestType::DeactivateListener(DeactivateListener {
1868                        address: SocketAddress::from(**addr),
1869                        proxy: ListenerType::Udp.into(),
1870                        to_scm: false,
1871                    })
1872                    .into(),
1873                );
1874            }
1875        }
1876
1877        for addr in my_http_listeners.intersection(&their_http_listeners) {
1878            let my_listener = &self.http_listeners[*addr];
1879            let their_listener = &other.http_listeners[*addr];
1880
1881            if my_listener != their_listener {
1882                v.push(
1883                    RequestType::RemoveListener(RemoveListener {
1884                        address: SocketAddress::from(**addr),
1885                        proxy: ListenerType::Http.into(),
1886                    })
1887                    .into(),
1888                );
1889                // any added listener should be unactive
1890                let mut listener_to_add = their_listener.clone();
1891                listener_to_add.active = false;
1892                v.push(RequestType::AddHttpListener(listener_to_add).into());
1893
1894                // The Remove + Add(active=false) above wipes the listener's
1895                // active state. Re-emit an ActivateListener whenever the target
1896                // state keeps it active, otherwise a config change on a still
1897                // active listener would silently deactivate it on replay. This
1898                // subsumes the newly-active (`!my.active && their.active`) case:
1899                // a differing `active` flag always makes the listeners unequal.
1900                if their_listener.active {
1901                    v.push(
1902                        RequestType::ActivateListener(ActivateListener {
1903                            address: SocketAddress::from(**addr),
1904                            proxy: ListenerType::Http.into(),
1905                            from_scm: false,
1906                        })
1907                        .into(),
1908                    );
1909                }
1910            }
1911
1912            if my_listener.active && !their_listener.active {
1913                v.push(
1914                    RequestType::DeactivateListener(DeactivateListener {
1915                        address: SocketAddress::from(**addr),
1916                        proxy: ListenerType::Http.into(),
1917                        to_scm: false,
1918                    })
1919                    .into(),
1920                );
1921            }
1922        }
1923
1924        for addr in my_https_listeners.intersection(&their_https_listeners) {
1925            let my_listener = &self.https_listeners[*addr];
1926            let their_listener = &other.https_listeners[*addr];
1927
1928            if my_listener != their_listener {
1929                v.push(
1930                    RequestType::RemoveListener(RemoveListener {
1931                        address: SocketAddress::from(**addr),
1932                        proxy: ListenerType::Https.into(),
1933                    })
1934                    .into(),
1935                );
1936                // any added listener should be unactive
1937                let mut listener_to_add = their_listener.clone();
1938                listener_to_add.active = false;
1939                v.push(RequestType::AddHttpsListener(listener_to_add).into());
1940
1941                // The Remove + Add(active=false) above wipes the listener's
1942                // active state. Re-emit an ActivateListener whenever the target
1943                // state keeps it active, otherwise a config change on a still
1944                // active listener would silently deactivate it on replay. This
1945                // subsumes the newly-active (`!my.active && their.active`) case:
1946                // a differing `active` flag always makes the listeners unequal.
1947                if their_listener.active {
1948                    v.push(
1949                        RequestType::ActivateListener(ActivateListener {
1950                            address: SocketAddress::from(**addr),
1951                            proxy: ListenerType::Https.into(),
1952                            from_scm: false,
1953                        })
1954                        .into(),
1955                    );
1956                }
1957            }
1958
1959            if my_listener.active && !their_listener.active {
1960                v.push(
1961                    RequestType::DeactivateListener(DeactivateListener {
1962                        address: SocketAddress::from(**addr),
1963                        proxy: ListenerType::Https.into(),
1964                        to_scm: false,
1965                    })
1966                    .into(),
1967                );
1968            }
1969        }
1970
1971        for (cluster_id, res) in diff_map(self.clusters.iter(), other.clusters.iter()) {
1972            match res {
1973                DiffResult::Added | DiffResult::Changed => v.push(
1974                    RequestType::AddCluster(other.clusters.get(cluster_id).unwrap().clone()).into(),
1975                ),
1976                DiffResult::Removed => {
1977                    v.push(RequestType::RemoveCluster(cluster_id.to_string()).into())
1978                }
1979            }
1980        }
1981
1982        for ((cluster_id, backend_id), res) in diff_map(
1983            self.backends.iter().flat_map(|(cluster_id, v)| {
1984                v.iter()
1985                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1986            }),
1987            other.backends.iter().flat_map(|(cluster_id, v)| {
1988                v.iter()
1989                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1990            }),
1991        ) {
1992            match res {
1993                DiffResult::Added => {
1994                    let backend = other
1995                        .backends
1996                        .get(cluster_id)
1997                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1998                        .unwrap();
1999                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
2000                }
2001                DiffResult::Removed => {
2002                    let backend = self
2003                        .backends
2004                        .get(cluster_id)
2005                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
2006                        .unwrap();
2007
2008                    v.push(
2009                        RequestType::RemoveBackend(RemoveBackend {
2010                            cluster_id: backend.cluster_id.clone(),
2011                            backend_id: backend.backend_id.clone(),
2012                            address: SocketAddress::from(backend.address),
2013                        })
2014                        .into(),
2015                    );
2016                }
2017                DiffResult::Changed => {
2018                    let backend = self
2019                        .backends
2020                        .get(cluster_id)
2021                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
2022                        .unwrap();
2023
2024                    v.push(
2025                        RequestType::RemoveBackend(RemoveBackend {
2026                            cluster_id: backend.cluster_id.clone(),
2027                            backend_id: backend.backend_id.clone(),
2028                            address: SocketAddress::from(backend.address),
2029                        })
2030                        .into(),
2031                    );
2032
2033                    let backend = other
2034                        .backends
2035                        .get(cluster_id)
2036                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
2037                        .unwrap();
2038                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
2039                }
2040            }
2041        }
2042
2043        let mut my_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
2044        for (route, front) in self.http_fronts.iter() {
2045            my_http_fronts.insert((route, front));
2046        }
2047        let mut their_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
2048        for (route, front) in other.http_fronts.iter() {
2049            their_http_fronts.insert((route, front));
2050        }
2051
2052        let removed_http_fronts = my_http_fronts.difference(&their_http_fronts);
2053        let added_http_fronts = their_http_fronts.difference(&my_http_fronts);
2054
2055        for &(_, front) in removed_http_fronts {
2056            v.push(RequestType::RemoveHttpFrontend(front.clone().into()).into());
2057        }
2058
2059        for &(_, front) in added_http_fronts {
2060            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
2061        }
2062
2063        let mut my_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
2064        for (route, front) in self.https_fronts.iter() {
2065            my_https_fronts.insert((route, front));
2066        }
2067        let mut their_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
2068        for (route, front) in other.https_fronts.iter() {
2069            their_https_fronts.insert((route, front));
2070        }
2071        let removed_https_fronts = my_https_fronts.difference(&their_https_fronts);
2072        let added_https_fronts = their_https_fronts.difference(&my_https_fronts);
2073
2074        for &(_, front) in removed_https_fronts {
2075            v.push(RequestType::RemoveHttpsFrontend(front.clone().into()).into());
2076        }
2077
2078        for &(_, front) in added_https_fronts {
2079            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
2080        }
2081
2082        let mut my_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
2083        for (cluster_id, front_list) in self.tcp_fronts.iter() {
2084            for front in front_list.iter() {
2085                my_tcp_fronts.insert((cluster_id, front));
2086            }
2087        }
2088        let mut their_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
2089        for (cluster_id, front_list) in other.tcp_fronts.iter() {
2090            for front in front_list.iter() {
2091                their_tcp_fronts.insert((cluster_id, front));
2092            }
2093        }
2094
2095        let removed_tcp_fronts = my_tcp_fronts.difference(&their_tcp_fronts);
2096        let added_tcp_fronts = their_tcp_fronts.difference(&my_tcp_fronts);
2097
2098        for &(_, front) in removed_tcp_fronts {
2099            v.push(RequestType::RemoveTcpFrontend(front.clone().into()).into());
2100        }
2101
2102        for &(_, front) in added_tcp_fronts {
2103            v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
2104        }
2105
2106        let mut my_udp_fronts: HashSet<(&ClusterId, &UdpFrontend)> = HashSet::new();
2107        for (cluster_id, front_list) in self.udp_fronts.iter() {
2108            for front in front_list.iter() {
2109                my_udp_fronts.insert((cluster_id, front));
2110            }
2111        }
2112        let mut their_udp_fronts: HashSet<(&ClusterId, &UdpFrontend)> = HashSet::new();
2113        for (cluster_id, front_list) in other.udp_fronts.iter() {
2114            for front in front_list.iter() {
2115                their_udp_fronts.insert((cluster_id, front));
2116            }
2117        }
2118
2119        let removed_udp_fronts = my_udp_fronts.difference(&their_udp_fronts);
2120        let added_udp_fronts = their_udp_fronts.difference(&my_udp_fronts);
2121
2122        for &(_, front) in removed_udp_fronts {
2123            v.push(RequestType::RemoveUdpFrontend(front.clone().into()).into());
2124        }
2125
2126        for &(_, front) in added_udp_fronts {
2127            v.push(RequestType::AddUdpFrontend(front.clone().into()).into());
2128        }
2129
2130        //pub certificates:    HashMap<SocketAddr, HashMap<CertificateFingerprint, (CertificateAndKey, Vec<String>)>>,
2131        let my_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
2132            self.certificates
2133                .iter()
2134                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
2135        );
2136        let their_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
2137            other
2138                .certificates
2139                .iter()
2140                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
2141        );
2142
2143        let removed_certificates = my_certificates.difference(&their_certificates);
2144        let added_certificates = their_certificates.difference(&my_certificates);
2145
2146        for &(address, fingerprint) in removed_certificates {
2147            v.push(
2148                RequestType::RemoveCertificate(RemoveCertificate {
2149                    address: SocketAddress::from(address),
2150                    fingerprint: fingerprint.to_string(),
2151                })
2152                .into(),
2153            );
2154        }
2155
2156        for &(address, fingerprint) in added_certificates {
2157            if let Some(certificate_and_key) = other
2158                .certificates
2159                .get(&address)
2160                .and_then(|certs| certs.get(fingerprint))
2161            {
2162                v.push(
2163                    RequestType::AddCertificate(AddCertificate {
2164                        address: SocketAddress::from(address),
2165                        certificate: certificate_and_key.clone(),
2166                        expired_at: None,
2167                    })
2168                    .into(),
2169                );
2170            }
2171        }
2172
2173        for address in added_tcp_listeners {
2174            let listener = &other.tcp_listeners[*address];
2175            if listener.active {
2176                v.push(
2177                    RequestType::ActivateListener(ActivateListener {
2178                        address: listener.address,
2179                        proxy: ListenerType::Tcp.into(),
2180                        from_scm: false,
2181                    })
2182                    .into(),
2183                );
2184            }
2185        }
2186
2187        for address in added_udp_listeners {
2188            let listener = &other.udp_listeners[*address];
2189            if listener.active {
2190                v.push(
2191                    RequestType::ActivateListener(ActivateListener {
2192                        address: listener.address,
2193                        proxy: ListenerType::Udp.into(),
2194                        from_scm: false,
2195                    })
2196                    .into(),
2197                );
2198            }
2199        }
2200
2201        // Replay symmetry: the request set `diff` emits, when replayed onto a
2202        // clone of `self`, must reproduce `other`'s routing-relevant maps —
2203        // listeners included, with their `active` flag. This is the property the
2204        // hot-reconfig fan-out relies on — a worker applies these requests and
2205        // must converge on `other`. We verify it in debug by actually replaying;
2206        // `dispatch`'s own invariant sweep runs on every step.
2207        //
2208        // One deliberate normalization: `backends`/`tcp_fronts` are compared
2209        // after dropping empty buckets. `remove_backend`/`remove_tcp_frontend`
2210        // leave an empty `Vec` under a cluster key, whereas `other` may have no
2211        // key at all. An empty bucket emits no requests and is semantically
2212        // equivalent to an absent one, so we normalize it away before comparing.
2213        #[cfg(debug_assertions)]
2214        {
2215            let mut replayed = self.clone();
2216            for request in &v {
2217                // A diff request must always be dispatchable onto `self`.
2218                debug_assert!(
2219                    replayed.dispatch(request).is_ok(),
2220                    "every request emitted by diff must replay cleanly onto self"
2221                );
2222            }
2223            let nonempty = |m: &BTreeMap<ClusterId, Vec<Backend>>| {
2224                m.iter()
2225                    .filter(|(_, v)| !v.is_empty())
2226                    .map(|(k, v)| (k.clone(), v.clone()))
2227                    .collect::<BTreeMap<_, _>>()
2228            };
2229            let nonempty_tcp = |m: &HashMap<ClusterId, Vec<TcpFrontend>>| {
2230                m.iter()
2231                    .filter(|(_, v)| !v.is_empty())
2232                    .map(|(k, v)| (k.clone(), v.clone()))
2233                    .collect::<HashMap<_, _>>()
2234            };
2235            debug_assert!(
2236                replayed.clusters == other.clusters
2237                    && nonempty(&replayed.backends) == nonempty(&other.backends)
2238                    && replayed.http_fronts == other.http_fronts
2239                    && replayed.https_fronts == other.https_fronts
2240                    && nonempty_tcp(&replayed.tcp_fronts) == nonempty_tcp(&other.tcp_fronts)
2241                    && replayed.certificates == other.certificates
2242                    && replayed.http_listeners == other.http_listeners
2243                    && replayed.https_listeners == other.https_listeners
2244                    && replayed.tcp_listeners == other.tcp_listeners
2245                    && replayed.udp_listeners == other.udp_listeners,
2246                "replaying diff(self, other) onto self must reproduce other's clusters/backends/frontends/certificates/listeners"
2247            );
2248        }
2249
2250        v
2251    }
2252
2253    // FIXME: what about deny rules?
2254    pub fn hash_state(&self) -> BTreeMap<ClusterId, u64> {
2255        let mut hm: HashMap<ClusterId, DefaultHasher> = self
2256            .clusters
2257            .keys()
2258            .map(|cluster_id| {
2259                let mut hasher = DefaultHasher::new();
2260                self.clusters.get(cluster_id).hash(&mut hasher);
2261                if let Some(backends) = self.backends.get(cluster_id) {
2262                    backends.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
2263                }
2264                if let Some(tcp_fronts) = self.tcp_fronts.get(cluster_id) {
2265                    tcp_fronts.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
2266                }
2267                (cluster_id.to_owned(), hasher)
2268            })
2269            .collect();
2270
2271        for front in self.http_fronts.values() {
2272            if let Some(cluster_id) = &front.cluster_id {
2273                if let Some(hasher) = hm.get_mut(cluster_id) {
2274                    front.hash(hasher);
2275                }
2276            }
2277        }
2278
2279        for front in self.https_fronts.values() {
2280            if let Some(cluster_id) = &front.cluster_id {
2281                if let Some(hasher) = hm.get_mut(cluster_id) {
2282                    front.hash(hasher);
2283                }
2284            }
2285        }
2286
2287        hm.drain()
2288            .map(|(cluster_id, hasher)| (cluster_id, hasher.finish()))
2289            .collect()
2290    }
2291
2292    /// Gives details about a given cluster.
2293    /// Types like `HttpFrontend` are converted into protobuf ones, like `RequestHttpFrontend`
2294    pub fn cluster_state(&self, cluster_id: &str) -> Option<ClusterInformation> {
2295        let configuration = self.clusters.get(cluster_id).cloned()?;
2296        info!("{:?}", configuration);
2297
2298        let http_frontends: Vec<RequestHttpFrontend> = self
2299            .http_fronts
2300            .values()
2301            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
2302            .map(|front| front.clone().into())
2303            .collect();
2304
2305        let https_frontends: Vec<RequestHttpFrontend> = self
2306            .https_fronts
2307            .values()
2308            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
2309            .map(|front| front.clone().into())
2310            .collect();
2311
2312        let tcp_frontends: Vec<RequestTcpFrontend> = self
2313            .tcp_fronts
2314            .get(cluster_id)
2315            .cloned()
2316            .unwrap_or_default()
2317            .iter()
2318            .map(|front| front.clone().into())
2319            .collect();
2320
2321        let udp_frontends: Vec<RequestUdpFrontend> = self
2322            .udp_fronts
2323            .get(cluster_id)
2324            .cloned()
2325            .unwrap_or_default()
2326            .iter()
2327            .map(|front| front.clone().into())
2328            .collect();
2329
2330        let backends: Vec<AddBackend> = self
2331            .backends
2332            .get(cluster_id)
2333            .cloned()
2334            .unwrap_or_default()
2335            .iter()
2336            .map(|backend| backend.clone().into())
2337            .collect();
2338
2339        Some(ClusterInformation {
2340            configuration: Some(configuration),
2341            http_frontends,
2342            https_frontends,
2343            tcp_frontends,
2344            backends,
2345            udp_frontends,
2346        })
2347    }
2348
2349    pub fn count_backends(&self) -> usize {
2350        self.backends.values().fold(0, |acc, v| acc + v.len())
2351    }
2352
2353    pub fn count_frontends(&self) -> usize {
2354        self.http_fronts.values().count()
2355            + self.https_fronts.values().count()
2356            + self.tcp_fronts.values().fold(0, |acc, v| acc + v.len())
2357            + self.udp_fronts.values().fold(0, |acc, v| acc + v.len())
2358    }
2359
2360    pub fn get_cluster_ids_by_domain(
2361        &self,
2362        hostname: String,
2363        path: Option<String>,
2364    ) -> HashSet<ClusterId> {
2365        let mut cluster_ids: HashSet<ClusterId> = HashSet::new();
2366
2367        self.http_fronts.values().for_each(|front| {
2368            if domain_check(&front.hostname, &front.path, &hostname, &path) {
2369                if let Some(id) = &front.cluster_id {
2370                    cluster_ids.insert(id.to_string());
2371                }
2372            }
2373        });
2374
2375        self.https_fronts.values().for_each(|front| {
2376            if domain_check(&front.hostname, &front.path, &hostname, &path) {
2377                if let Some(id) = &front.cluster_id {
2378                    cluster_ids.insert(id.to_string());
2379                }
2380            }
2381        });
2382
2383        cluster_ids
2384    }
2385
2386    pub fn get_certificates(
2387        &self,
2388        filters: QueryCertificatesFilters,
2389    ) -> BTreeMap<String, CertificateAndKey> {
2390        self.certificates
2391            .values()
2392            .flat_map(|hash_map| hash_map.iter())
2393            .filter(|(fingerprint, cert)| {
2394                if let Some(domain) = &filters.domain {
2395                    cert.names.contains(domain)
2396                } else if let Some(f) = &filters.fingerprint {
2397                    fingerprint.to_string() == *f
2398                } else {
2399                    true
2400                }
2401            })
2402            .map(|(fingerprint, cert)| (fingerprint.to_string(), cert.to_owned()))
2403            .collect()
2404    }
2405
2406    pub fn list_frontends(&self, filters: FrontendFilters) -> ListedFrontends {
2407        // if no http / https / tcp filter is provided, list all of them
2408        let list_all = !filters.http && !filters.https && !filters.tcp;
2409
2410        let mut listed_frontends = ListedFrontends::default();
2411
2412        if filters.http || list_all {
2413            for http_frontend in self.http_fronts.iter().filter(|f| {
2414                if let Some(domain) = &filters.domain {
2415                    f.1.hostname.contains(domain)
2416                } else {
2417                    true
2418                }
2419            }) {
2420                listed_frontends
2421                    .http_frontends
2422                    .push(http_frontend.1.to_owned().into());
2423            }
2424        }
2425
2426        if filters.https || list_all {
2427            for https_frontend in self.https_fronts.iter().filter(|f| {
2428                if let Some(domain) = &filters.domain {
2429                    f.1.hostname.contains(domain)
2430                } else {
2431                    true
2432                }
2433            }) {
2434                listed_frontends
2435                    .https_frontends
2436                    .push(https_frontend.1.to_owned().into());
2437            }
2438        }
2439
2440        if (filters.tcp || list_all) && filters.domain.is_none() {
2441            for tcp_frontend in self.tcp_fronts.values().flat_map(|v| v.iter()) {
2442                listed_frontends
2443                    .tcp_frontends
2444                    .push(tcp_frontend.to_owned().into())
2445            }
2446        }
2447
2448        // `FrontendFilters` has no dedicated `udp` flag, so UDP frontends ride
2449        // the same default/all-pass path as TCP: surfaced when no protocol
2450        // filter is set (`list_all`) or when the `tcp` filter is requested.
2451        // Datagram frontends carry no hostname, so a `domain` filter excludes
2452        // them (matching the TCP branch).
2453        if (filters.tcp || list_all) && filters.domain.is_none() {
2454            for udp_frontend in self.udp_fronts.values().flat_map(|v| v.iter()) {
2455                listed_frontends
2456                    .udp_frontends
2457                    .push(udp_frontend.to_owned().into())
2458            }
2459        }
2460
2461        listed_frontends
2462    }
2463
2464    pub fn list_listeners(&self) -> ListenersList {
2465        ListenersList {
2466            http_listeners: self
2467                .http_listeners
2468                .iter()
2469                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
2470                .collect(),
2471            https_listeners: self
2472                .https_listeners
2473                .iter()
2474                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
2475                .collect(),
2476            tcp_listeners: self
2477                .tcp_listeners
2478                .iter()
2479                .map(|(addr, listener)| (addr.to_string(), *listener))
2480                .collect(),
2481            udp_listeners: self
2482                .udp_listeners
2483                .iter()
2484                .map(|(addr, listener)| (addr.to_string(), *listener))
2485                .collect(),
2486        }
2487    }
2488
2489    // create requests needed for a worker to recreate the state
2490    pub fn produce_initial_state(&self) -> InitialState {
2491        let mut worker_requests = Vec::new();
2492        for (counter, request) in self.generate_requests().into_iter().enumerate() {
2493            worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
2494        }
2495        InitialState {
2496            requests: worker_requests,
2497        }
2498    }
2499
2500    /// generate requests necessary to recreate the state,
2501    /// in protobuf, to a temp file
2502    pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
2503        let initial_state = self.produce_initial_state();
2504        let count = initial_state.requests.len();
2505
2506        let bytes_to_write = initial_state.encode_to_vec();
2507        println!("writing {} in the temp file", bytes_to_write.len());
2508        file.write_all(&bytes_to_write)
2509            .map_err(StateError::FileError)?;
2510
2511        file.sync_all().map_err(StateError::FileError)?;
2512
2513        Ok(count)
2514    }
2515
2516    /// generate requests necessary to recreate the state,
2517    /// write them in a JSON form in a file, separated by \n\0,
2518    /// returns the number of written requests
2519    pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
2520        let mut counter = 0usize;
2521        let requests = self.generate_requests();
2522
2523        for request in requests {
2524            let message = WorkerRequest::new(format!("SAVE-{counter}"), request);
2525
2526            file.write_all(
2527                &serde_json::to_string(&message)
2528                    .map(|s| s.into_bytes())
2529                    .unwrap_or_default(),
2530            )
2531            .map_err(StateError::FileError)?;
2532
2533            file.write_all(&b"\n\0"[..])
2534                .map_err(StateError::FileError)?;
2535
2536            if counter % 1000 == 0 {
2537                info!("writing {} commands to file", counter);
2538                file.sync_all().map_err(StateError::FileError)?;
2539            }
2540            counter += 1;
2541        }
2542        file.sync_all().map_err(StateError::FileError)?;
2543
2544        Ok(counter)
2545    }
2546}
2547
2548/// Validate all H2 flood knobs in an HTTP listener patch.
2549///
2550/// Every flood-detector knob (including stream-0 WINDOW_UPDATE) requires a
2551/// value `>= 1`. Passing `0` would disable the detector entirely and leave the
2552/// proxy open to CVE-2023-44487 and related attacks. The runtime constructor
2553/// `H2FloodConfig::new()` applies the same `.max(1)` clamping, but a raw
2554/// protobuf client can bypass the CLI layer, so we enforce the bound here too.
2555///
2556/// `h2_max_concurrent_streams` and `h2_stream_shrink_ratio` are connection-
2557/// config knobs that also require `>= 1`.
2558///
2559/// `h2_graceful_shutdown_deadline_seconds = 0` is intentionally **allowed** —
2560/// it means "wait forever (no forced close after GOAWAY)".
2561pub fn validate_h2_flood_knobs_http(patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
2562    macro_rules! require_ge1 {
2563        ($field:expr, $name:literal) => {
2564            if let Some(0) = $field {
2565                return Err(StateError::InvalidValue {
2566                    field: $name,
2567                    reason: "must be >= 1",
2568                });
2569            }
2570        };
2571    }
2572    require_ge1!(
2573        patch.h2_max_rst_stream_per_window,
2574        "h2_max_rst_stream_per_window"
2575    );
2576    require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
2577    require_ge1!(
2578        patch.h2_max_settings_per_window,
2579        "h2_max_settings_per_window"
2580    );
2581    require_ge1!(
2582        patch.h2_max_empty_data_per_window,
2583        "h2_max_empty_data_per_window"
2584    );
2585    require_ge1!(
2586        patch.h2_max_continuation_frames,
2587        "h2_max_continuation_frames"
2588    );
2589    require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
2590    require_ge1!(
2591        patch.h2_max_window_update_stream0_per_window,
2592        "h2_max_window_update_stream0_per_window"
2593    );
2594    require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
2595    // Shrink ratio runtime floor is 2 (lib/src/protocol/mux/h2.rs ~448 .max(2));
2596    // anything lower is silently promoted so reject at control plane.
2597    if let Some(v) = patch.h2_stream_shrink_ratio {
2598        if v < 2 {
2599            return Err(StateError::InvalidValue {
2600                field: "h2_stream_shrink_ratio",
2601                reason: "must be >= 2",
2602            });
2603        }
2604    }
2605    // Lifetime caps and HPACK limits — must be >= 1 or the runtime trips on the
2606    // first qualifying frame. doc/configure.md advertises "u64 (>= 1)" etc.
2607    require_ge1!(
2608        patch.h2_max_rst_stream_lifetime,
2609        "h2_max_rst_stream_lifetime"
2610    );
2611    require_ge1!(
2612        patch.h2_max_rst_stream_abusive_lifetime,
2613        "h2_max_rst_stream_abusive_lifetime"
2614    );
2615    require_ge1!(
2616        patch.h2_max_rst_stream_emitted_lifetime,
2617        "h2_max_rst_stream_emitted_lifetime"
2618    );
2619    require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
2620    require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
2621    require_ge1!(patch.h2_max_header_fields, "h2_max_header_fields");
2622    Ok(())
2623}
2624
2625/// Validate all H2 flood knobs in an HTTPS listener patch (same rules as HTTP).
2626pub fn validate_h2_flood_knobs_https(patch: &UpdateHttpsListenerConfig) -> Result<(), StateError> {
2627    macro_rules! require_ge1 {
2628        ($field:expr, $name:literal) => {
2629            if let Some(0) = $field {
2630                return Err(StateError::InvalidValue {
2631                    field: $name,
2632                    reason: "must be >= 1",
2633                });
2634            }
2635        };
2636    }
2637    require_ge1!(
2638        patch.h2_max_rst_stream_per_window,
2639        "h2_max_rst_stream_per_window"
2640    );
2641    require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
2642    require_ge1!(
2643        patch.h2_max_settings_per_window,
2644        "h2_max_settings_per_window"
2645    );
2646    require_ge1!(
2647        patch.h2_max_empty_data_per_window,
2648        "h2_max_empty_data_per_window"
2649    );
2650    require_ge1!(
2651        patch.h2_max_continuation_frames,
2652        "h2_max_continuation_frames"
2653    );
2654    require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
2655    require_ge1!(
2656        patch.h2_max_window_update_stream0_per_window,
2657        "h2_max_window_update_stream0_per_window"
2658    );
2659    require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
2660    if let Some(v) = patch.h2_stream_shrink_ratio {
2661        if v < 2 {
2662            return Err(StateError::InvalidValue {
2663                field: "h2_stream_shrink_ratio",
2664                reason: "must be >= 2",
2665            });
2666        }
2667    }
2668    require_ge1!(
2669        patch.h2_max_rst_stream_lifetime,
2670        "h2_max_rst_stream_lifetime"
2671    );
2672    require_ge1!(
2673        patch.h2_max_rst_stream_abusive_lifetime,
2674        "h2_max_rst_stream_abusive_lifetime"
2675    );
2676    require_ge1!(
2677        patch.h2_max_rst_stream_emitted_lifetime,
2678        "h2_max_rst_stream_emitted_lifetime"
2679    );
2680    require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
2681    require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
2682    require_ge1!(patch.h2_max_header_fields, "h2_max_header_fields");
2683    Ok(())
2684}
2685
2686/// Validate a `sozu_id_header` value against RFC 9110 §5.1 header-name grammar.
2687///
2688/// Rejects empty strings and strings containing CR, LF, colon, space, or tab —
2689/// a conservative approximation of the token grammar that covers all practical
2690/// injection vectors without a full RFC 9110 tokenizer.
2691/// Merge a `CustomHttpAnswers` patch into the listener's stored answers,
2692/// preserving any field not present in the patch.
2693///
2694/// Field-mask semantic: `None` in `patch` means "preserve", `Some` means
2695/// "replace". A no-op patch (all-None) leaves `target` untouched. If `target`
2696/// is currently `None`, initialize it from the patch (any `None` field stays
2697/// `None` so hot-upgrade replay sees the same partial state).
2698pub fn merge_custom_http_answers(
2699    target: &mut Option<CustomHttpAnswers>,
2700    patch: &CustomHttpAnswers,
2701) {
2702    let current = target.get_or_insert_with(CustomHttpAnswers::default);
2703    macro_rules! merge_field {
2704        ($field:ident) => {
2705            if let Some(ref v) = patch.$field {
2706                current.$field = Some(v.clone());
2707            }
2708        };
2709    }
2710    merge_field!(answer_301);
2711    merge_field!(answer_400);
2712    merge_field!(answer_401);
2713    merge_field!(answer_404);
2714    merge_field!(answer_408);
2715    merge_field!(answer_413);
2716    merge_field!(answer_421);
2717    merge_field!(answer_502);
2718    merge_field!(answer_503);
2719    merge_field!(answer_504);
2720    merge_field!(answer_507);
2721}
2722
2723/// Validate an `AlpnProtocols` patch: each value must be "h2" or "http/1.1".
2724/// Empty values vec is allowed (reset-to-default).
2725pub fn validate_alpn_protocols(values: &[String]) -> Result<(), StateError> {
2726    for value in values {
2727        if value != "h2" && value != "http/1.1" {
2728            return Err(StateError::InvalidValue {
2729                field: "alpn_protocols",
2730                reason: "each value must be \"h2\" or \"http/1.1\"",
2731            });
2732        }
2733    }
2734    Ok(())
2735}
2736
2737/// Validate a `sozu_id_header` value against the RFC 9110 §5.1 `token` grammar:
2738///
2739/// ```text
2740/// token  = 1*tchar
2741/// tchar  = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
2742///          "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
2743/// ```
2744///
2745/// Rejects empty strings, non-ASCII bytes, controls (including CR/LF/tab),
2746/// separators (including colon and space), and any other non-`tchar` byte.
2747pub fn validate_sozu_id_header(value: &str) -> Result<(), StateError> {
2748    if value.is_empty() {
2749        return Err(StateError::InvalidValue {
2750            field: "sozu_id_header",
2751            reason: "must not be empty",
2752        });
2753    }
2754    for b in value.bytes() {
2755        let is_tchar = b.is_ascii_alphanumeric()
2756            || matches!(
2757                b,
2758                b'!' | b'#'
2759                    | b'$'
2760                    | b'%'
2761                    | b'&'
2762                    | b'\''
2763                    | b'*'
2764                    | b'+'
2765                    | b'-'
2766                    | b'.'
2767                    | b'^'
2768                    | b'_'
2769                    | b'`'
2770                    | b'|'
2771                    | b'~'
2772            );
2773        if !is_tchar {
2774            return Err(StateError::InvalidValue {
2775                field: "sozu_id_header",
2776                reason: "must be a valid HTTP header name (RFC 9110 §5.1 token: alphanumeric or one of !#$%&'*+-.^_`|~)",
2777            });
2778        }
2779    }
2780    Ok(())
2781}
2782
2783fn domain_check(
2784    front_hostname: &str,
2785    front_path_rule: &PathRule,
2786    hostname: &str,
2787    path_prefix: &Option<String>,
2788) -> bool {
2789    if hostname != front_hostname {
2790        return false;
2791    }
2792
2793    if let Some(path) = &path_prefix {
2794        return path == &front_path_rule.value;
2795    }
2796
2797    true
2798}
2799
2800struct DiffMap<'a, K: Ord, V, I1, I2> {
2801    my_it: I1,
2802    other_it: I2,
2803    my: Option<(K, &'a V)>,
2804    other: Option<(K, &'a V)>,
2805}
2806
2807//fn diff_map<'a, K:Ord, V: PartialEq>(my: &'a BTreeMap<K,V>, other: &'a BTreeMap<K,V>) -> DiffMap<'a,K,V> {
2808fn diff_map<
2809    'a,
2810    K: Ord,
2811    V: PartialEq,
2812    I1: Iterator<Item = (K, &'a V)>,
2813    I2: Iterator<Item = (K, &'a V)>,
2814>(
2815    my: I1,
2816    other: I2,
2817) -> DiffMap<'a, K, V, I1, I2> {
2818    DiffMap {
2819        my_it: my,
2820        other_it: other,
2821        my: None,
2822        other: None,
2823    }
2824}
2825
2826enum DiffResult {
2827    Added,
2828    Removed,
2829    Changed,
2830}
2831
2832// this will iterate over the keys of both iterators
2833// since keys are sorted, it should be easy to see which ones are in common or not
2834impl<'a, K: Ord, V: PartialEq, I1: Iterator<Item = (K, &'a V)>, I2: Iterator<Item = (K, &'a V)>>
2835    std::iter::Iterator for DiffMap<'a, K, V, I1, I2>
2836{
2837    type Item = (K, DiffResult);
2838
2839    fn next(&mut self) -> Option<Self::Item> {
2840        loop {
2841            if self.my.is_none() {
2842                self.my = self.my_it.next();
2843            }
2844            if self.other.is_none() {
2845                self.other = self.other_it.next();
2846            }
2847
2848            match (self.my.take(), self.other.take()) {
2849                // there are no more elements in my_it, all the next elements in other
2850                // should be added
2851                // if other was none, we will stop the iterator there
2852                (None, other) => return other.map(|(k, _)| (k, DiffResult::Added)),
2853                // there are no more elements in other_it, all the next elements in my
2854                // should be removed
2855                (Some((k, _)), None) => return Some((k, DiffResult::Removed)),
2856                // element is present in my but not other
2857                (Some((k1, _v1)), Some((k2, v2))) if k1 < k2 => {
2858                    self.other = Some((k2, v2));
2859                    return Some((k1, DiffResult::Removed));
2860                }
2861                // element is present in other byt not in my
2862                (Some((k1, v1)), Some((k2, _v2))) if k1 > k2 => {
2863                    self.my = Some((k1, v1));
2864                    return Some((k2, DiffResult::Added));
2865                }
2866                (Some((k1, v1)), Some((_k2, v2))) if v1 != v2 => {
2867                    // key is present in both, if elements have changed
2868                    // return a value, otherwise go to the next key for both maps
2869                    return Some((k1, DiffResult::Changed));
2870                }
2871                _ => {}
2872            }
2873        }
2874    }
2875}
2876
2877#[cfg(test)]
2878mod tests {
2879    use rand::{RngExt, rng, seq::SliceRandom};
2880
2881    use super::*;
2882    use crate::proto::command::{
2883        CustomHttpAnswers, LoadBalancingParams, RequestHttpFrontend, RequestTcpFrontend,
2884        RequestUdpFrontend, RulePosition, UdpListenerConfig, UpdateUdpListenerConfig,
2885    };
2886
2887    #[test]
2888    fn serialize() {
2889        let mut state: ConfigState = Default::default();
2890        state
2891            .dispatch(
2892                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2893                    cluster_id: Some(String::from("cluster_1")),
2894                    hostname: String::from("lolcatho.st:8080"),
2895                    path: PathRule::prefix(String::from("/")),
2896                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2897                    position: RulePosition::Tree.into(),
2898                    ..Default::default()
2899                })
2900                .into(),
2901            )
2902            .expect("Could not execute request");
2903        state
2904            .dispatch(
2905                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2906                    cluster_id: Some(String::from("cluster_2")),
2907                    hostname: String::from("test.local"),
2908                    path: PathRule::prefix(String::from("/abc")),
2909                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2910                    position: RulePosition::Pre.into(),
2911                    ..Default::default()
2912                })
2913                .into(),
2914            )
2915            .expect("Could not execute request");
2916        state
2917            .dispatch(
2918                &RequestType::AddBackend(AddBackend {
2919                    cluster_id: String::from("cluster_1"),
2920                    backend_id: String::from("cluster_1-0"),
2921                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2922                    ..Default::default()
2923                })
2924                .into(),
2925            )
2926            .expect("Could not execute request");
2927        state
2928            .dispatch(
2929                &RequestType::AddBackend(AddBackend {
2930                    cluster_id: String::from("cluster_1"),
2931                    backend_id: String::from("cluster_1-1"),
2932                    address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
2933                    ..Default::default()
2934                })
2935                .into(),
2936            )
2937            .expect("Could not execute request");
2938        state
2939            .dispatch(
2940                &RequestType::AddBackend(AddBackend {
2941                    cluster_id: String::from("cluster_2"),
2942                    backend_id: String::from("cluster_2-0"),
2943                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2944                    ..Default::default()
2945                })
2946                .into(),
2947            )
2948            .expect("Could not execute request");
2949        state
2950            .dispatch(
2951                &RequestType::AddBackend(AddBackend {
2952                    cluster_id: String::from("cluster_1"),
2953                    backend_id: String::from("cluster_1-3"),
2954                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2955                    ..Default::default()
2956                })
2957                .into(),
2958            )
2959            .expect("Could not execute request");
2960        state
2961            .dispatch(
2962                &RequestType::RemoveBackend(RemoveBackend {
2963                    cluster_id: String::from("cluster_1"),
2964                    backend_id: String::from("cluster_1-3"),
2965                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2966                })
2967                .into(),
2968            )
2969            .expect("Could not execute request");
2970
2971        /*
2972        let encoded = state.encode();
2973        println!("serialized:\n{}", encoded);
2974
2975        let new_state: Option<HttpProxy> = decode_str(&encoded);
2976        println!("deserialized:\n{:?}", new_state);
2977        assert_eq!(new_state, Some(state));
2978        */
2979        //assert!(false);
2980    }
2981
2982    #[test]
2983    fn diff() {
2984        let mut state: ConfigState = Default::default();
2985        state
2986            .dispatch(
2987                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2988                    cluster_id: Some(String::from("cluster_1")),
2989                    hostname: String::from("lolcatho.st:8080"),
2990                    path: PathRule::prefix(String::from("/")),
2991                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2992                    position: RulePosition::Post.into(),
2993                    ..Default::default()
2994                })
2995                .into(),
2996            )
2997            .expect("Could not execute request");
2998        state
2999            .dispatch(
3000                &RequestType::AddHttpFrontend(RequestHttpFrontend {
3001                    cluster_id: Some(String::from("cluster_2")),
3002                    hostname: String::from("test.local"),
3003                    path: PathRule::prefix(String::from("/abc")),
3004                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3005                    ..Default::default()
3006                })
3007                .into(),
3008            )
3009            .expect("Could not execute request");
3010        state
3011            .dispatch(
3012                &RequestType::AddBackend(AddBackend {
3013                    cluster_id: String::from("cluster_1"),
3014                    backend_id: String::from("cluster_1-0"),
3015                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3016                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3017                    ..Default::default()
3018                })
3019                .into(),
3020            )
3021            .expect("Could not execute request");
3022        state
3023            .dispatch(
3024                &RequestType::AddBackend(AddBackend {
3025                    cluster_id: String::from("cluster_1"),
3026                    backend_id: String::from("cluster_1-1"),
3027                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
3028                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3029                    ..Default::default()
3030                })
3031                .into(),
3032            )
3033            .expect("Could not execute request");
3034        state
3035            .dispatch(
3036                &RequestType::AddBackend(AddBackend {
3037                    cluster_id: String::from("cluster_2"),
3038                    backend_id: String::from("cluster_2-0"),
3039                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
3040                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3041                    ..Default::default()
3042                })
3043                .into(),
3044            )
3045            .expect("Could not execute request");
3046        state
3047            .dispatch(
3048                &RequestType::AddCluster(Cluster {
3049                    cluster_id: String::from("cluster_2"),
3050                    sticky_session: true,
3051                    https_redirect: true,
3052                    ..Default::default()
3053                })
3054                .into(),
3055            )
3056            .expect("Could not execute request");
3057
3058        let mut state2: ConfigState = Default::default();
3059        state2
3060            .dispatch(
3061                &RequestType::AddHttpFrontend(RequestHttpFrontend {
3062                    cluster_id: Some(String::from("cluster_1")),
3063                    hostname: String::from("lolcatho.st:8080"),
3064                    path: PathRule::prefix(String::from("/")),
3065                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3066                    position: RulePosition::Post.into(),
3067                    ..Default::default()
3068                })
3069                .into(),
3070            )
3071            .expect("Could not execute request");
3072        state2
3073            .dispatch(
3074                &RequestType::AddBackend(AddBackend {
3075                    cluster_id: String::from("cluster_1"),
3076                    backend_id: String::from("cluster_1-0"),
3077                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3078                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3079                    ..Default::default()
3080                })
3081                .into(),
3082            )
3083            .expect("Could not execute request");
3084        state2
3085            .dispatch(
3086                &RequestType::AddBackend(AddBackend {
3087                    cluster_id: String::from("cluster_1"),
3088                    backend_id: String::from("cluster_1-1"),
3089                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
3090                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3091                    ..Default::default()
3092                })
3093                .into(),
3094            )
3095            .expect("Could not execute request");
3096        state2
3097            .dispatch(
3098                &RequestType::AddBackend(AddBackend {
3099                    cluster_id: String::from("cluster_1"),
3100                    backend_id: String::from("cluster_1-2"),
3101                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
3102                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3103                    ..Default::default()
3104                })
3105                .into(),
3106            )
3107            .expect("Could not execute request");
3108        state2
3109            .dispatch(
3110                &RequestType::AddCluster(Cluster {
3111                    cluster_id: String::from("cluster_3"),
3112                    sticky_session: false,
3113                    https_redirect: false,
3114                    ..Default::default()
3115                })
3116                .into(),
3117            )
3118            .expect("Could not execute request");
3119
3120        let e: Vec<Request> = vec![
3121            RequestType::RemoveHttpFrontend(RequestHttpFrontend {
3122                cluster_id: Some(String::from("cluster_2")),
3123                hostname: String::from("test.local"),
3124                path: PathRule::prefix(String::from("/abc")),
3125                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3126                ..Default::default()
3127            })
3128            .into(),
3129            RequestType::RemoveBackend(RemoveBackend {
3130                cluster_id: String::from("cluster_2"),
3131                backend_id: String::from("cluster_2-0"),
3132                address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
3133            })
3134            .into(),
3135            RequestType::AddBackend(AddBackend {
3136                cluster_id: String::from("cluster_1"),
3137                backend_id: String::from("cluster_1-2"),
3138                address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
3139                load_balancing_parameters: Some(LoadBalancingParams::default()),
3140                ..Default::default()
3141            })
3142            .into(),
3143            RequestType::RemoveCluster(String::from("cluster_2")).into(),
3144            RequestType::AddCluster(Cluster {
3145                cluster_id: String::from("cluster_3"),
3146                sticky_session: false,
3147                https_redirect: false,
3148                ..Default::default()
3149            })
3150            .into(),
3151        ];
3152        let expected_diff: HashSet<&Request> = HashSet::from_iter(e.iter());
3153
3154        let d = state.diff(&state2);
3155        let diff = HashSet::from_iter(d.iter());
3156        println!("diff requests:\n{diff:#?}\n");
3157        println!("expected diff requests:\n{expected_diff:#?}\n");
3158
3159        let hash1 = state.hash_state();
3160        let hash2 = state2.hash_state();
3161        let mut state3 = state.clone();
3162        state3
3163            .dispatch(
3164                &RequestType::AddBackend(AddBackend {
3165                    cluster_id: String::from("cluster_1"),
3166                    backend_id: String::from("cluster_1-2"),
3167                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
3168                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3169                    ..Default::default()
3170                })
3171                .into(),
3172            )
3173            .expect("Could not execute request");
3174        let hash3 = state3.hash_state();
3175        println!("state 1 hashes: {hash1:#?}");
3176        println!("state 2 hashes: {hash2:#?}");
3177        println!("state 3 hashes: {hash3:#?}");
3178
3179        assert_eq!(diff, expected_diff);
3180    }
3181
3182    #[test]
3183    fn cluster_ids_by_domain() {
3184        let mut config = ConfigState::new();
3185        let http_front_cluster1 = RequestHttpFrontend {
3186            cluster_id: Some(String::from("MyCluster_1")),
3187            hostname: String::from("lolcatho.st"),
3188            path: PathRule::prefix(String::from("")),
3189            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3190            ..Default::default()
3191        };
3192
3193        let https_front_cluster1 = RequestHttpFrontend {
3194            cluster_id: Some(String::from("MyCluster_1")),
3195            hostname: String::from("lolcatho.st"),
3196            path: PathRule::prefix(String::from("")),
3197            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3198            ..Default::default()
3199        };
3200
3201        let http_front_cluster2 = RequestHttpFrontend {
3202            cluster_id: Some(String::from("MyCluster_2")),
3203            hostname: String::from("lolcatho.st"),
3204            path: PathRule::prefix(String::from("/api")),
3205            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3206            ..Default::default()
3207        };
3208
3209        let https_front_cluster2 = RequestHttpFrontend {
3210            cluster_id: Some(String::from("MyCluster_2")),
3211            hostname: String::from("lolcatho.st"),
3212            path: PathRule::prefix(String::from("/api")),
3213            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3214            ..Default::default()
3215        };
3216
3217        config
3218            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster1).into())
3219            .expect("Could not execute request");
3220        config
3221            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster2).into())
3222            .expect("Could not execute request");
3223        config
3224            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster1).into())
3225            .expect("Could not execute request");
3226        config
3227            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster2).into())
3228            .expect("Could not execute request");
3229
3230        let mut cluster1_cluster2: HashSet<ClusterId> = HashSet::new();
3231        cluster1_cluster2.insert(String::from("MyCluster_1"));
3232        cluster1_cluster2.insert(String::from("MyCluster_2"));
3233
3234        let mut cluster2: HashSet<ClusterId> = HashSet::new();
3235        cluster2.insert(String::from("MyCluster_2"));
3236
3237        let empty: HashSet<ClusterId> = HashSet::new();
3238        assert_eq!(
3239            config.get_cluster_ids_by_domain(String::from("lolcatho.st"), None),
3240            cluster1_cluster2
3241        );
3242        assert_eq!(
3243            config
3244                .get_cluster_ids_by_domain(String::from("lolcatho.st"), Some(String::from("/api"))),
3245            cluster2
3246        );
3247        assert_eq!(
3248            config.get_cluster_ids_by_domain(String::from("lolcathost"), None),
3249            empty
3250        );
3251        assert_eq!(
3252            config
3253                .get_cluster_ids_by_domain(String::from("lolcathost"), Some(String::from("/sozu"))),
3254            empty
3255        );
3256    }
3257
3258    #[test]
3259    fn duplicate_backends() {
3260        let mut state: ConfigState = Default::default();
3261        state
3262            .dispatch(
3263                &RequestType::AddBackend(AddBackend {
3264                    cluster_id: String::from("cluster_1"),
3265                    backend_id: String::from("cluster_1-0"),
3266                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3267                    load_balancing_parameters: Some(LoadBalancingParams::default()),
3268                    ..Default::default()
3269                })
3270                .into(),
3271            )
3272            .expect("Could not execute request");
3273
3274        let b = Backend {
3275            cluster_id: String::from("cluster_1"),
3276            backend_id: String::from("cluster_1-0"),
3277            address: "127.0.0.1:1026".parse().unwrap(),
3278            load_balancing_parameters: Some(LoadBalancingParams::default()),
3279            sticky_id: Some("sticky".to_string()),
3280            backup: None,
3281        };
3282
3283        state
3284            .dispatch(&RequestType::AddBackend(b.clone().to_add_backend()).into())
3285            .expect("Could not execute order");
3286
3287        assert_eq!(state.backends.get("cluster_1").unwrap(), &vec![b]);
3288    }
3289
3290    #[test]
3291    fn remove_backend() {
3292        let mut state: ConfigState = Default::default();
3293        state
3294            .dispatch(
3295                &RequestType::AddCluster(Cluster {
3296                    cluster_id: String::from("cluster_1"),
3297                    ..Default::default()
3298                })
3299                .into(),
3300            )
3301            .expect("Could not execute request");
3302
3303        for i in 0..10 {
3304            state
3305                .dispatch(
3306                    &RequestType::AddBackend(AddBackend {
3307                        cluster_id: String::from("cluster_1"),
3308                        backend_id: format!("cluster_1-{i}"),
3309                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3310                        ..Default::default()
3311                    })
3312                    .into(),
3313                )
3314                .expect("Could not execute request");
3315        }
3316
3317        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 10);
3318
3319        let remove_backend_2 = RequestType::RemoveBackend(RemoveBackend {
3320            cluster_id: String::from("cluster_1"),
3321            backend_id: String::from("cluster_1-0"),
3322            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3323        })
3324        .into();
3325
3326        let remove_backend_result = state.dispatch(&remove_backend_2);
3327
3328        assert!(remove_backend_result.is_ok());
3329        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
3330
3331        let redundant_remove = state.dispatch(&remove_backend_2);
3332        assert!(matches!(redundant_remove, Err(StateError::NoChange)));
3333        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
3334    }
3335
3336    #[test]
3337    fn remove_backends_randomly() {
3338        let mut state: ConfigState = Default::default();
3339        state
3340            .dispatch(
3341                &RequestType::AddCluster(Cluster {
3342                    cluster_id: String::from("cluster_1"),
3343                    ..Default::default()
3344                })
3345                .into(),
3346            )
3347            .expect("Could not execute request");
3348
3349        for _ in 0..1000 {
3350            for i in 0..10 {
3351                state
3352                    .dispatch(
3353                        &RequestType::AddBackend(AddBackend {
3354                            cluster_id: String::from("cluster_1"),
3355                            backend_id: format!("cluster_1-{i}"),
3356                            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3357                            ..Default::default()
3358                        })
3359                        .into(),
3360                    )
3361                    .expect("Could not execute request");
3362            }
3363
3364            let mut rng = rng();
3365            let mut indexes = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
3366            indexes.shuffle(&mut rng);
3367            let random_count = rng.random_range(1..indexes.len());
3368            let random_indexes: Vec<i32> = indexes.into_iter().take(random_count).collect();
3369
3370            for j in random_indexes {
3371                let remove_backend_result = state.dispatch(
3372                    &RequestType::RemoveBackend(RemoveBackend {
3373                        cluster_id: String::from("cluster_1"),
3374                        backend_id: format!("cluster_1-{j}"),
3375                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3376                    })
3377                    .into(),
3378                );
3379                assert!(remove_backend_result.is_ok());
3380            }
3381        }
3382    }
3383
3384    #[test]
3385    fn listener_diff() {
3386        let mut state: ConfigState = Default::default();
3387        let custom_http_answers = Some(CustomHttpAnswers {
3388            answer_404: Some("test".to_string()),
3389            ..Default::default()
3390        });
3391        state
3392            .dispatch(
3393                &RequestType::AddTcpListener(TcpListenerConfig {
3394                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3395                    ..Default::default()
3396                })
3397                .into(),
3398            )
3399            .expect("Could not execute request");
3400        state
3401            .dispatch(
3402                &RequestType::ActivateListener(ActivateListener {
3403                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3404                    proxy: ListenerType::Tcp.into(),
3405                    from_scm: false,
3406                })
3407                .into(),
3408            )
3409            .expect("Could not execute request");
3410        state
3411            .dispatch(
3412                &RequestType::AddHttpListener(HttpListenerConfig {
3413                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3414                    ..Default::default()
3415                })
3416                .into(),
3417            )
3418            .expect("Could not execute request");
3419        state
3420            .dispatch(
3421                &RequestType::AddHttpsListener(HttpsListenerConfig {
3422                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3423                    ..Default::default()
3424                })
3425                .into(),
3426            )
3427            .expect("Could not execute request");
3428        state
3429            .dispatch(
3430                &RequestType::ActivateListener(ActivateListener {
3431                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3432                    proxy: ListenerType::Https.into(),
3433                    from_scm: false,
3434                })
3435                .into(),
3436            )
3437            .expect("Could not execute request");
3438
3439        let mut state2: ConfigState = Default::default();
3440        state2
3441            .dispatch(
3442                &RequestType::AddTcpListener(TcpListenerConfig {
3443                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3444                    expect_proxy: true,
3445                    ..Default::default()
3446                })
3447                .into(),
3448            )
3449            .expect("Could not execute request");
3450        state2
3451            .dispatch(
3452                &RequestType::AddHttpListener(HttpListenerConfig {
3453                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3454                    http_answers: custom_http_answers.clone(),
3455                    ..Default::default()
3456                })
3457                .into(),
3458            )
3459            .expect("Could not execute request");
3460        state2
3461            .dispatch(
3462                &RequestType::ActivateListener(ActivateListener {
3463                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3464                    proxy: ListenerType::Http.into(),
3465                    from_scm: false,
3466                })
3467                .into(),
3468            )
3469            .expect("Could not execute request");
3470        state2
3471            .dispatch(
3472                &RequestType::AddHttpsListener(HttpsListenerConfig {
3473                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3474                    http_answers: custom_http_answers.clone(),
3475                    ..Default::default()
3476                })
3477                .into(),
3478            )
3479            .expect("Could not execute request");
3480        state2
3481            .dispatch(
3482                &RequestType::ActivateListener(ActivateListener {
3483                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3484                    proxy: ListenerType::Https.into(),
3485                    from_scm: false,
3486                })
3487                .into(),
3488            )
3489            .expect("Could not execute request");
3490
3491        let e: Vec<Request> = vec![
3492            RequestType::RemoveListener(RemoveListener {
3493                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3494                proxy: ListenerType::Tcp.into(),
3495            })
3496            .into(),
3497            RequestType::AddTcpListener(TcpListenerConfig {
3498                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3499                expect_proxy: true,
3500                ..Default::default()
3501            })
3502            .into(),
3503            RequestType::DeactivateListener(DeactivateListener {
3504                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3505                proxy: ListenerType::Tcp.into(),
3506                to_scm: false,
3507            })
3508            .into(),
3509            RequestType::RemoveListener(RemoveListener {
3510                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3511                proxy: ListenerType::Http.into(),
3512            })
3513            .into(),
3514            RequestType::AddHttpListener(HttpListenerConfig {
3515                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3516                http_answers: custom_http_answers.clone(),
3517                ..Default::default()
3518            })
3519            .into(),
3520            RequestType::ActivateListener(ActivateListener {
3521                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3522                proxy: ListenerType::Http.into(),
3523                from_scm: false,
3524            })
3525            .into(),
3526            RequestType::RemoveListener(RemoveListener {
3527                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3528                proxy: ListenerType::Https.into(),
3529            })
3530            .into(),
3531            RequestType::AddHttpsListener(HttpsListenerConfig {
3532                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3533                http_answers: custom_http_answers.clone(),
3534                ..Default::default()
3535            })
3536            .into(),
3537            // The 8443 HTTPS listener is active in both states but its content
3538            // changed (custom answers). The Remove + Add(active=false) above
3539            // wipes the active flag, so diff must re-emit an ActivateListener to
3540            // keep the listener live across the hot reconfig. Without it the
3541            // worker would silently deactivate the listener on replay.
3542            RequestType::ActivateListener(ActivateListener {
3543                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3544                proxy: ListenerType::Https.into(),
3545                from_scm: false,
3546            })
3547            .into(),
3548        ];
3549
3550        let diff = state.diff(&state2);
3551        //let diff: HashSet<&RequestContent> = HashSet::from_iter(d.iter());
3552        println!("expected diff requests:\n{e:#?}\n");
3553        println!("diff requests:\n{diff:#?}\n");
3554
3555        let _hash1 = state.hash_state();
3556        let _hash2 = state2.hash_state();
3557
3558        assert_eq!(diff, e);
3559
3560        // Round-trip: replaying diff(state -> state2) onto a clone of `state`
3561        // must reproduce `state2`'s listener maps EXACTLY, active flag included.
3562        // This is the hot-reconfig correctness property — a worker applies these
3563        // requests and must converge on the target state. In particular the 8443
3564        // HTTPS listener (active in both states, content changed) must come back
3565        // ACTIVE, which is the bug the ActivateListener re-emission above fixes.
3566        let mut replayed = state.clone();
3567        for request in &diff {
3568            replayed
3569                .dispatch(request)
3570                .expect("every diff request must replay cleanly onto the source state");
3571        }
3572        assert_eq!(
3573            replayed.tcp_listeners, state2.tcp_listeners,
3574            "replayed tcp_listeners must match the target state"
3575        );
3576        assert_eq!(
3577            replayed.http_listeners, state2.http_listeners,
3578            "replayed http_listeners must match the target state"
3579        );
3580        assert_eq!(
3581            replayed.https_listeners, state2.https_listeners,
3582            "replayed https_listeners must match the target state"
3583        );
3584        // Explicitly assert the still-active listener stays active across the
3585        // config change (the core of the fixed bug).
3586        let replayed_8443 = replayed
3587            .https_listeners
3588            .get(&SocketAddr::from(SocketAddress::new_v4(0, 0, 0, 0, 8443)))
3589            .expect("8443 HTTPS listener must exist after replay");
3590        assert!(
3591            replayed_8443.active,
3592            "the 8443 HTTPS listener must stay ACTIVE across a config change"
3593        );
3594    }
3595
3596    #[test]
3597    fn certificate_retrieval() {
3598        let mut state: ConfigState = Default::default();
3599        let certificate_and_key = CertificateAndKey {
3600            certificate: String::from(include_str!("../assets/certificate.pem")),
3601            key: String::from(include_str!("../assets/key.pem")),
3602            certificate_chain: vec![],
3603            versions: vec![],
3604            names: vec!["lolcatho.st".to_string()],
3605        };
3606        let add_certificate = AddCertificate {
3607            address: SocketAddress::new_v4(127, 0, 0, 1, 8080),
3608            certificate: certificate_and_key,
3609            expired_at: None,
3610        };
3611        state
3612            .dispatch(&RequestType::AddCertificate(add_certificate).into())
3613            .expect("Could not add certificate");
3614
3615        println!("state: {state:#?}");
3616
3617        // let fingerprint: Fingerprint = serde_json::from_str(
3618        //     "\"ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5\"",
3619        // )
3620        // .expect("Could not deserialize the fingerprint");
3621
3622        let certificates_found_by_fingerprint = state.get_certificates(QueryCertificatesFilters {
3623            domain: None,
3624            fingerprint: Some(
3625                "ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5".to_string(),
3626            ),
3627        });
3628
3629        println!("found certificate: {certificates_found_by_fingerprint:#?}");
3630
3631        assert!(!certificates_found_by_fingerprint.is_empty());
3632
3633        let certificate_found_by_domain_name = state.get_certificates(QueryCertificatesFilters {
3634            domain: Some("lolcatho.st".to_string()),
3635            fingerprint: None,
3636        });
3637
3638        assert!(!certificate_found_by_domain_name.is_empty());
3639    }
3640
3641    #[test]
3642    fn count_backends_across_clusters() {
3643        let mut state: ConfigState = Default::default();
3644
3645        assert_eq!(state.count_backends(), 0);
3646
3647        state
3648            .dispatch(
3649                &RequestType::AddBackend(AddBackend {
3650                    cluster_id: String::from("cluster_1"),
3651                    backend_id: String::from("cluster_1-0"),
3652                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3653                    ..Default::default()
3654                })
3655                .into(),
3656            )
3657            .expect("Could not execute request");
3658        assert_eq!(state.count_backends(), 1);
3659
3660        state
3661            .dispatch(
3662                &RequestType::AddBackend(AddBackend {
3663                    cluster_id: String::from("cluster_1"),
3664                    backend_id: String::from("cluster_1-1"),
3665                    address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
3666                    ..Default::default()
3667                })
3668                .into(),
3669            )
3670            .expect("Could not execute request");
3671        assert_eq!(state.count_backends(), 2);
3672
3673        // add backend to a second cluster
3674        state
3675            .dispatch(
3676                &RequestType::AddBackend(AddBackend {
3677                    cluster_id: String::from("cluster_2"),
3678                    backend_id: String::from("cluster_2-0"),
3679                    address: SocketAddress::new_v4(192, 168, 1, 1, 8080),
3680                    ..Default::default()
3681                })
3682                .into(),
3683            )
3684            .expect("Could not execute request");
3685        assert_eq!(state.count_backends(), 3);
3686
3687        // remove a backend and verify count decreases
3688        state
3689            .dispatch(
3690                &RequestType::RemoveBackend(RemoveBackend {
3691                    cluster_id: String::from("cluster_1"),
3692                    backend_id: String::from("cluster_1-0"),
3693                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3694                })
3695                .into(),
3696            )
3697            .expect("Could not execute request");
3698        assert_eq!(state.count_backends(), 2);
3699    }
3700
3701    #[test]
3702    fn count_frontends_across_types() {
3703        let mut state: ConfigState = Default::default();
3704
3705        assert_eq!(state.count_frontends(), 0);
3706
3707        // add an HTTP frontend
3708        state
3709            .dispatch(
3710                &RequestType::AddHttpFrontend(RequestHttpFrontend {
3711                    cluster_id: Some(String::from("cluster_1")),
3712                    hostname: String::from("example.com"),
3713                    path: PathRule::prefix(String::from("/")),
3714                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3715                    position: RulePosition::Tree.into(),
3716                    ..Default::default()
3717                })
3718                .into(),
3719            )
3720            .expect("Could not execute request");
3721        assert_eq!(state.count_frontends(), 1);
3722
3723        // add an HTTPS frontend
3724        state
3725            .dispatch(
3726                &RequestType::AddHttpsFrontend(RequestHttpFrontend {
3727                    cluster_id: Some(String::from("cluster_1")),
3728                    hostname: String::from("secure.example.com"),
3729                    path: PathRule::prefix(String::from("/")),
3730                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3731                    position: RulePosition::Tree.into(),
3732                    ..Default::default()
3733                })
3734                .into(),
3735            )
3736            .expect("Could not execute request");
3737        assert_eq!(state.count_frontends(), 2);
3738
3739        // add a TCP frontend
3740        state
3741            .dispatch(
3742                &RequestType::AddTcpFrontend(RequestTcpFrontend {
3743                    cluster_id: String::from("cluster_2"),
3744                    address: SocketAddress::new_v4(0, 0, 0, 0, 5432),
3745                    ..Default::default()
3746                })
3747                .into(),
3748            )
3749            .expect("Could not execute request");
3750        assert_eq!(state.count_frontends(), 3);
3751
3752        // add a second TCP frontend on the same cluster
3753        state
3754            .dispatch(
3755                &RequestType::AddTcpFrontend(RequestTcpFrontend {
3756                    cluster_id: String::from("cluster_2"),
3757                    address: SocketAddress::new_v4(0, 0, 0, 0, 5433),
3758                    ..Default::default()
3759                })
3760                .into(),
3761            )
3762            .expect("Could not execute request");
3763        assert_eq!(state.count_frontends(), 4);
3764
3765        // remove the HTTP frontend
3766        state
3767            .dispatch(
3768                &RequestType::RemoveHttpFrontend(RequestHttpFrontend {
3769                    cluster_id: Some(String::from("cluster_1")),
3770                    hostname: String::from("example.com"),
3771                    path: PathRule::prefix(String::from("/")),
3772                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3773                    position: RulePosition::Tree.into(),
3774                    ..Default::default()
3775                })
3776                .into(),
3777            )
3778            .expect("Could not execute request");
3779        assert_eq!(state.count_frontends(), 3);
3780    }
3781
3782    // ── helpers ────────────────────────────────────────────────────────────────
3783
3784    fn make_https_listener(address: SocketAddress) -> HttpsListenerConfig {
3785        HttpsListenerConfig {
3786            address,
3787            sticky_name: "SOZUBALANCEID".to_owned(),
3788            front_timeout: 60,
3789            back_timeout: 30,
3790            connect_timeout: 3,
3791            request_timeout: 10,
3792            ..Default::default()
3793        }
3794    }
3795
3796    fn make_http_listener(address: SocketAddress) -> HttpListenerConfig {
3797        HttpListenerConfig {
3798            address,
3799            sticky_name: "SOZUBALANCEID".to_owned(),
3800            front_timeout: 60,
3801            back_timeout: 30,
3802            connect_timeout: 3,
3803            request_timeout: 10,
3804            ..Default::default()
3805        }
3806    }
3807
3808    fn make_tcp_listener(address: SocketAddress) -> TcpListenerConfig {
3809        TcpListenerConfig {
3810            address,
3811            front_timeout: 60,
3812            back_timeout: 30,
3813            connect_timeout: 3,
3814            ..Default::default()
3815        }
3816    }
3817
3818    fn make_udp_listener(address: SocketAddress, active: bool) -> UdpListenerConfig {
3819        UdpListenerConfig {
3820            address,
3821            public_address: None,
3822            front_timeout: 30,
3823            back_timeout: 30,
3824            max_rx_datagram_size: 1500,
3825            max_flows: 0,
3826            active,
3827        }
3828    }
3829
3830    /// Mandatory roundtrip guard for the UDP control-plane data model.
3831    ///
3832    /// 1. Build a state holding an active UDP listener + UDP frontend, run
3833    ///    `generate_requests()`, replay every emitted request into a fresh
3834    ///    `ConfigState`, and assert the two states are byte-for-byte equal.
3835    /// 2. Prove a `diff()` between an empty state and the UDP-bearing state
3836    ///    produces requests that, replayed, reconstruct the UDP listener and
3837    ///    frontend (a hot-add path), and that the reverse diff tears them
3838    ///    down again.
3839    #[test]
3840    fn test_udp_state_roundtrip() {
3841        let address = SocketAddress::new_v4(127, 0, 0, 1, 5353);
3842
3843        let mut state = ConfigState::default();
3844        state
3845            .dispatch(&RequestType::AddUdpListener(make_udp_listener(address, true)).into())
3846            .expect("could not add udp listener");
3847        state
3848            .dispatch(
3849                &RequestType::ActivateListener(ActivateListener {
3850                    address,
3851                    proxy: ListenerType::Udp.into(),
3852                    from_scm: false,
3853                })
3854                .into(),
3855            )
3856            .expect("could not activate udp listener");
3857        state
3858            .dispatch(
3859                &RequestType::AddUdpFrontend(RequestUdpFrontend {
3860                    cluster_id: "udp_cluster".to_string(),
3861                    address,
3862                    tags: BTreeMap::from([("owner".to_string(), "team".to_string())]),
3863                })
3864                .into(),
3865            )
3866            .expect("could not add udp frontend");
3867
3868        assert_eq!(state.udp_listeners.len(), 1);
3869        assert!(state.udp_listeners[&address.into()].active);
3870        assert_eq!(
3871            state.udp_fronts.get("udp_cluster").map(Vec::len),
3872            Some(1usize)
3873        );
3874
3875        // `request_counts` is a runtime census side-effect of `dispatch`; it
3876        // diverges by construction whenever the number/shape of replayed
3877        // requests differs from the originals, so compare the logical config
3878        // with the census cleared on both sides.
3879        let logical = |s: &ConfigState| {
3880            let mut c = s.clone();
3881            c.request_counts.clear();
3882            c
3883        };
3884
3885        // 1. generate_requests → replay → equal
3886        let mut replayed = ConfigState::default();
3887        for request in state.generate_requests() {
3888            replayed
3889                .dispatch(&request)
3890                .expect("could not replay generated request");
3891        }
3892        assert_eq!(
3893            logical(&state),
3894            logical(&replayed),
3895            "UDP listener + frontend must survive generate_requests → replay"
3896        );
3897
3898        // 2. diff from empty reconstructs the UDP objects
3899        let empty = ConfigState::default();
3900        let mut from_diff = ConfigState::default();
3901        for request in empty.diff(&state) {
3902            from_diff
3903                .dispatch(&request)
3904                .expect("could not replay diff request");
3905        }
3906        assert_eq!(
3907            logical(&state),
3908            logical(&from_diff),
3909            "diff(empty -> state) must reconstruct the UDP listener + frontend"
3910        );
3911
3912        // reverse diff tears them back down to empty
3913        let mut torn_down = state.clone();
3914        for request in state.diff(&empty) {
3915            torn_down
3916                .dispatch(&request)
3917                .expect("could not replay teardown diff request");
3918        }
3919        assert!(
3920            torn_down.udp_listeners.is_empty(),
3921            "diff(state -> empty) must remove the UDP listener"
3922        );
3923        assert!(
3924            torn_down
3925                .udp_fronts
3926                .get("udp_cluster")
3927                .map(Vec::is_empty)
3928                .unwrap_or(true),
3929            "diff(state -> empty) must remove the UDP frontend"
3930        );
3931
3932        // update path: a partial patch mutates the stored listener in place
3933        state
3934            .dispatch(
3935                &RequestType::UpdateUdpListener(UpdateUdpListenerConfig {
3936                    address,
3937                    max_flows: Some(4096),
3938                    front_timeout: Some(15),
3939                    ..Default::default()
3940                })
3941                .into(),
3942            )
3943            .expect("could not update udp listener");
3944        let updated = &state.udp_listeners[&address.into()];
3945        assert_eq!(updated.max_flows, 4096);
3946        assert_eq!(updated.front_timeout, 15);
3947        assert_eq!(
3948            updated.back_timeout, 30,
3949            "unpatched field must be preserved"
3950        );
3951    }
3952
3953    /// `list_frontends` must surface UDP frontends alongside TCP ones. The
3954    /// proto `FrontendFilters` has no `udp` flag, so UDP rides the default
3955    /// (all-pass) and `tcp` filters; a `domain` filter excludes both.
3956    #[test]
3957    fn list_frontends_includes_udp() {
3958        let tcp_addr = SocketAddress::new_v4(0, 0, 0, 0, 6379);
3959        let udp_addr = SocketAddress::new_v4(0, 0, 0, 0, 5353);
3960
3961        let mut state = ConfigState::default();
3962        state
3963            .dispatch(
3964                &RequestType::AddTcpFrontend(RequestTcpFrontend {
3965                    cluster_id: "tcp_cluster".to_string(),
3966                    address: tcp_addr,
3967                    ..Default::default()
3968                })
3969                .into(),
3970            )
3971            .expect("could not add tcp frontend");
3972        state
3973            .dispatch(
3974                &RequestType::AddUdpFrontend(RequestUdpFrontend {
3975                    cluster_id: "udp_cluster".to_string(),
3976                    address: udp_addr,
3977                    ..Default::default()
3978                })
3979                .into(),
3980            )
3981            .expect("could not add udp frontend");
3982
3983        // default filters (all false, no domain) → list everything, incl. UDP
3984        let all = state.list_frontends(FrontendFilters::default());
3985        assert_eq!(all.tcp_frontends.len(), 1, "tcp frontend must be listed");
3986        assert_eq!(
3987            all.udp_frontends.len(),
3988            1,
3989            "udp frontend must be listed under the default all-pass path"
3990        );
3991        assert_eq!(all.udp_frontends[0].cluster_id, "udp_cluster");
3992        assert_eq!(all.udp_frontends[0].address, udp_addr);
3993
3994        // explicit `tcp` filter surfaces both TCP and UDP (no `udp` flag exists)
3995        let tcp_only = state.list_frontends(FrontendFilters {
3996            tcp: true,
3997            ..Default::default()
3998        });
3999        assert_eq!(tcp_only.tcp_frontends.len(), 1);
4000        assert_eq!(
4001            tcp_only.udp_frontends.len(),
4002            1,
4003            "udp frontends ride the tcp filter"
4004        );
4005
4006        // an `http` filter excludes both TCP and UDP
4007        let http_only = state.list_frontends(FrontendFilters {
4008            http: true,
4009            ..Default::default()
4010        });
4011        assert!(http_only.tcp_frontends.is_empty());
4012        assert!(http_only.udp_frontends.is_empty());
4013
4014        // a `domain` filter excludes UDP (no hostname), matching TCP behaviour
4015        let domain_filtered = state.list_frontends(FrontendFilters {
4016            domain: Some("example.com".to_string()),
4017            ..Default::default()
4018        });
4019        assert!(domain_filtered.tcp_frontends.is_empty());
4020        assert!(domain_filtered.udp_frontends.is_empty());
4021    }
4022
4023    // ── update_https_listener ──────────────────────────────────────────────────
4024
4025    /// Happy path: patching two H2 flood knobs updates the map entry; all other
4026    /// fields are left untouched.
4027    #[test]
4028    fn update_https_listener_happy_path_h2_knobs() {
4029        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4030        let mut state = ConfigState::new();
4031        state
4032            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4033            .unwrap();
4034
4035        let patch = UpdateHttpsListenerConfig {
4036            address: addr,
4037            h2_max_rst_stream_per_window: Some(50),
4038            h2_max_ping_per_window: Some(20),
4039            ..Default::default()
4040        };
4041        state
4042            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4043            .expect("update must succeed");
4044
4045        let listener = state
4046            .https_listeners
4047            .get(&SocketAddr::from(addr))
4048            .expect("listener must be present");
4049        assert_eq!(listener.h2_max_rst_stream_per_window, Some(50));
4050        assert_eq!(listener.h2_max_ping_per_window, Some(20));
4051        // Untouched fields must be unchanged
4052        assert_eq!(listener.front_timeout, 60);
4053        assert_eq!(listener.h2_max_settings_per_window, None);
4054    }
4055
4056    /// NotFound: patching a listener address that was never registered returns
4057    /// `StateError::NotFound`.
4058    #[test]
4059    fn update_https_listener_not_found() {
4060        let mut state = ConfigState::new();
4061        let patch = UpdateHttpsListenerConfig {
4062            address: SocketAddress::new_v4(1, 2, 3, 4, 9999),
4063            h2_max_rst_stream_per_window: Some(50),
4064            ..Default::default()
4065        };
4066        let err = state
4067            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4068            .unwrap_err();
4069        assert!(
4070            matches!(
4071                err,
4072                StateError::NotFound {
4073                    kind: ObjectKind::HttpsListener,
4074                    ..
4075                }
4076            ),
4077            "expected NotFound, got: {err}"
4078        );
4079    }
4080
4081    /// No-op: a patch with only `address` set (all options None) is Ok and does
4082    /// not change any field.
4083    #[test]
4084    fn update_https_listener_noop_patch() {
4085        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4086        let mut state = ConfigState::new();
4087        let original = make_https_listener(addr);
4088        state
4089            .dispatch(&RequestType::AddHttpsListener(original.clone()).into())
4090            .unwrap();
4091
4092        let patch = UpdateHttpsListenerConfig {
4093            address: addr,
4094            ..Default::default()
4095        };
4096        state
4097            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4098            .expect("no-op patch must succeed");
4099
4100        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4101        assert_eq!(listener.front_timeout, original.front_timeout);
4102        assert_eq!(
4103            listener.h2_max_rst_stream_per_window,
4104            original.h2_max_rst_stream_per_window
4105        );
4106    }
4107
4108    /// InvalidValue: setting a flood knob to 0 must be rejected.
4109    #[test]
4110    fn update_https_listener_invalid_value_flood_knob_zero() {
4111        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4112        let mut state = ConfigState::new();
4113        state
4114            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4115            .unwrap();
4116
4117        let patch = UpdateHttpsListenerConfig {
4118            address: addr,
4119            h2_max_rst_stream_per_window: Some(0),
4120            ..Default::default()
4121        };
4122        let err = state
4123            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4124            .unwrap_err();
4125        assert!(
4126            matches!(
4127                err,
4128                StateError::InvalidValue {
4129                    field: "h2_max_rst_stream_per_window",
4130                    ..
4131                }
4132            ),
4133            "expected InvalidValue for flood knob 0, got: {err}"
4134        );
4135    }
4136
4137    /// AddCluster: an inline `cluster.health_check` with a CRLF-bearing URI
4138    /// must be rejected before the cluster lands in `ConfigState`. Without
4139    /// this guard, TOML reload / SaveState / direct API AddCluster requests
4140    /// bypass the SetHealthCheck-side check and let an attacker-controlled
4141    /// health-check URI smuggle CR/LF into outbound HTTP/1.1 probes.
4142    #[test]
4143    fn add_cluster_invalid_health_check_uri_rejected() {
4144        use crate::proto::command::HealthCheckConfig;
4145
4146        let mut state = ConfigState::new();
4147        let err = state
4148            .dispatch(
4149                &RequestType::AddCluster(Cluster {
4150                    cluster_id: String::from("evil_cluster"),
4151                    health_check: Some(HealthCheckConfig {
4152                        uri: String::from("/foo\r\nGET /admin"),
4153                        interval: 5_000,
4154                        timeout: 1_000,
4155                        healthy_threshold: 2,
4156                        unhealthy_threshold: 2,
4157                        ..Default::default()
4158                    }),
4159                    ..Default::default()
4160                })
4161                .into(),
4162            )
4163            .unwrap_err();
4164
4165        assert!(
4166            matches!(
4167                err,
4168                StateError::InvalidValue {
4169                    field: "health_check",
4170                    ..
4171                }
4172            ),
4173            "expected InvalidValue for CRLF-bearing health-check URI, got: {err}"
4174        );
4175        assert!(
4176            !state.clusters.contains_key("evil_cluster"),
4177            "cluster must not be inserted when health_check fails validation",
4178        );
4179    }
4180
4181    /// ALPN validation: reject unknown ALPN values.
4182    #[test]
4183    fn update_https_listener_alpn_unknown_value_rejected() {
4184        use crate::proto::command::AlpnProtocols;
4185
4186        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4187        let mut state = ConfigState::new();
4188        state
4189            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4190            .unwrap();
4191
4192        let patch = UpdateHttpsListenerConfig {
4193            address: addr,
4194            alpn_protocols: Some(AlpnProtocols {
4195                values: vec!["h3".to_owned()],
4196            }),
4197            ..Default::default()
4198        };
4199        let err = state
4200            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4201            .unwrap_err();
4202        assert!(
4203            matches!(
4204                err,
4205                StateError::InvalidValue {
4206                    field: "alpn_protocols",
4207                    ..
4208                }
4209            ),
4210            "expected InvalidValue for unknown ALPN, got: {err}"
4211        );
4212    }
4213
4214    /// ALPN validation: empty values vec = reset to default, must be accepted.
4215    #[test]
4216    fn update_https_listener_alpn_empty_reset_accepted() {
4217        use crate::proto::command::AlpnProtocols;
4218
4219        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4220        let mut state = ConfigState::new();
4221        let mut listener = make_https_listener(addr);
4222        listener.alpn_protocols = vec!["h2".to_owned()];
4223        state
4224            .dispatch(&RequestType::AddHttpsListener(listener).into())
4225            .unwrap();
4226
4227        let patch = UpdateHttpsListenerConfig {
4228            address: addr,
4229            alpn_protocols: Some(AlpnProtocols { values: vec![] }),
4230            ..Default::default()
4231        };
4232        state
4233            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4234            .expect("empty ALPN reset must succeed");
4235
4236        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4237        assert!(
4238            listener.alpn_protocols.is_empty(),
4239            "ALPN must have been reset to empty"
4240        );
4241    }
4242
4243    /// ALPN validation: valid values ["h2", "http/1.1"] must be accepted.
4244    #[test]
4245    fn update_https_listener_alpn_valid_values_accepted() {
4246        use crate::proto::command::AlpnProtocols;
4247
4248        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4249        let mut state = ConfigState::new();
4250        state
4251            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4252            .unwrap();
4253
4254        let patch = UpdateHttpsListenerConfig {
4255            address: addr,
4256            alpn_protocols: Some(AlpnProtocols {
4257                values: vec!["h2".to_owned(), "http/1.1".to_owned()],
4258            }),
4259            ..Default::default()
4260        };
4261        state
4262            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4263            .expect("valid ALPN must be accepted");
4264
4265        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4266        assert_eq!(listener.alpn_protocols, vec!["h2", "http/1.1"]);
4267    }
4268
4269    /// ALPN absent wrapper: when `alpn_protocols` is None in the patch, the
4270    /// listener's ALPN field must not be touched.
4271    #[test]
4272    fn update_https_listener_alpn_absent_preserves_existing() {
4273        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4274        let mut state = ConfigState::new();
4275        let mut listener = make_https_listener(addr);
4276        listener.alpn_protocols = vec!["h2".to_owned()];
4277        state
4278            .dispatch(&RequestType::AddHttpsListener(listener).into())
4279            .unwrap();
4280
4281        // patch with no alpn_protocols field
4282        let patch = UpdateHttpsListenerConfig {
4283            address: addr,
4284            front_timeout: Some(10),
4285            ..Default::default()
4286        };
4287        state
4288            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4289            .unwrap();
4290
4291        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4292        assert_eq!(
4293            listener.alpn_protocols,
4294            vec!["h2"],
4295            "ALPN must be preserved when not patched"
4296        );
4297    }
4298
4299    /// sozu_id_header validation: empty string must be rejected.
4300    #[test]
4301    fn update_https_listener_sozu_id_header_empty_rejected() {
4302        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4303        let mut state = ConfigState::new();
4304        state
4305            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4306            .unwrap();
4307
4308        let patch = UpdateHttpsListenerConfig {
4309            address: addr,
4310            sozu_id_header: Some(String::new()),
4311            ..Default::default()
4312        };
4313        let err = state
4314            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4315            .unwrap_err();
4316        assert!(
4317            matches!(
4318                err,
4319                StateError::InvalidValue {
4320                    field: "sozu_id_header",
4321                    ..
4322                }
4323            ),
4324            "expected InvalidValue for empty header name, got: {err}"
4325        );
4326    }
4327
4328    /// sozu_id_header validation: value containing colon must be rejected.
4329    #[test]
4330    fn update_https_listener_sozu_id_header_colon_rejected() {
4331        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4332        let mut state = ConfigState::new();
4333        state
4334            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4335            .unwrap();
4336
4337        let patch = UpdateHttpsListenerConfig {
4338            address: addr,
4339            sozu_id_header: Some("bad: value".to_owned()),
4340            ..Default::default()
4341        };
4342        let err = state
4343            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4344            .unwrap_err();
4345        assert!(
4346            matches!(
4347                err,
4348                StateError::InvalidValue {
4349                    field: "sozu_id_header",
4350                    ..
4351                }
4352            ),
4353            "expected InvalidValue for header name with colon, got: {err}"
4354        );
4355    }
4356
4357    /// sozu_id_header validation: well-formed token must be accepted.
4358    #[test]
4359    fn update_https_listener_sozu_id_header_valid_accepted() {
4360        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4361        let mut state = ConfigState::new();
4362        state
4363            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4364            .unwrap();
4365
4366        let patch = UpdateHttpsListenerConfig {
4367            address: addr,
4368            sozu_id_header: Some("X-Edge-Id".to_owned()),
4369            ..Default::default()
4370        };
4371        state
4372            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4373            .expect("valid header name must be accepted");
4374
4375        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4376        assert_eq!(listener.sozu_id_header.as_deref(), Some("X-Edge-Id"));
4377    }
4378
4379    /// h2_graceful_shutdown_deadline_seconds = 0 must be allowed (means "wait forever").
4380    #[test]
4381    fn update_https_listener_graceful_shutdown_zero_allowed() {
4382        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4383        let mut state = ConfigState::new();
4384        state
4385            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4386            .unwrap();
4387
4388        let patch = UpdateHttpsListenerConfig {
4389            address: addr,
4390            h2_graceful_shutdown_deadline_seconds: Some(0),
4391            ..Default::default()
4392        };
4393        state
4394            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4395            .expect("graceful_shutdown_deadline=0 must be allowed");
4396
4397        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4398        assert_eq!(listener.h2_graceful_shutdown_deadline_seconds, Some(0));
4399    }
4400
4401    // ── update_http_listener ───────────────────────────────────────────────────
4402
4403    /// Happy path for HTTP listener patch.
4404    #[test]
4405    fn update_http_listener_happy_path() {
4406        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
4407        let mut state = ConfigState::new();
4408        state
4409            .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
4410            .unwrap();
4411
4412        let patch = UpdateHttpListenerConfig {
4413            address: addr,
4414            front_timeout: Some(15),
4415            h2_max_rst_stream_per_window: Some(25),
4416            ..Default::default()
4417        };
4418        state
4419            .dispatch(&RequestType::UpdateHttpListener(patch).into())
4420            .expect("HTTP update must succeed");
4421
4422        let listener = state.http_listeners.get(&SocketAddr::from(addr)).unwrap();
4423        assert_eq!(listener.front_timeout, 15);
4424        assert_eq!(listener.h2_max_rst_stream_per_window, Some(25));
4425        // untouched
4426        assert_eq!(listener.back_timeout, 30);
4427    }
4428
4429    /// HTTP listener: flood knob 0 is rejected.
4430    #[test]
4431    fn update_http_listener_flood_knob_zero_rejected() {
4432        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
4433        let mut state = ConfigState::new();
4434        state
4435            .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
4436            .unwrap();
4437
4438        let patch = UpdateHttpListenerConfig {
4439            address: addr,
4440            h2_max_window_update_stream0_per_window: Some(0),
4441            ..Default::default()
4442        };
4443        let err = state
4444            .dispatch(&RequestType::UpdateHttpListener(patch).into())
4445            .unwrap_err();
4446        assert!(
4447            matches!(
4448                err,
4449                StateError::InvalidValue {
4450                    field: "h2_max_window_update_stream0_per_window",
4451                    ..
4452                }
4453            ),
4454            "expected InvalidValue, got: {err}"
4455        );
4456    }
4457
4458    // ── update_tcp_listener ────────────────────────────────────────────────────
4459
4460    /// Happy path for TCP listener patch.
4461    #[test]
4462    fn update_tcp_listener_happy_path() {
4463        let addr = SocketAddress::new_v4(0, 0, 0, 0, 9000);
4464        let mut state = ConfigState::new();
4465        state
4466            .dispatch(&RequestType::AddTcpListener(make_tcp_listener(addr)).into())
4467            .unwrap();
4468
4469        let patch = UpdateTcpListenerConfig {
4470            address: addr,
4471            front_timeout: Some(5),
4472            ..Default::default()
4473        };
4474        state
4475            .dispatch(&RequestType::UpdateTcpListener(patch).into())
4476            .expect("TCP update must succeed");
4477
4478        let listener = state.tcp_listeners.get(&SocketAddr::from(addr)).unwrap();
4479        assert_eq!(listener.front_timeout, 5);
4480        assert_eq!(listener.back_timeout, 30); // untouched
4481    }
4482
4483    /// TCP listener: NotFound when address is unknown.
4484    #[test]
4485    fn update_tcp_listener_not_found() {
4486        let mut state = ConfigState::new();
4487        let patch = UpdateTcpListenerConfig {
4488            address: SocketAddress::new_v4(9, 9, 9, 9, 9999),
4489            front_timeout: Some(5),
4490            ..Default::default()
4491        };
4492        let err = state
4493            .dispatch(&RequestType::UpdateTcpListener(patch).into())
4494            .unwrap_err();
4495        assert!(
4496            matches!(
4497                err,
4498                StateError::NotFound {
4499                    kind: ObjectKind::TcpListener,
4500                    ..
4501                }
4502            ),
4503            "expected NotFound, got: {err}"
4504        );
4505    }
4506
4507    /// `ConfigState::dispatch` MUST treat `SetMetricDetail` as a
4508    /// runtime-only verb (no persisted state mutation). A future
4509    /// refactor that drops the variant from the no-op match arm and
4510    /// falls through to the catch-all would silently re-break the
4511    /// SetMetricDetail dispatch path with `UndispatchableRequest`.
4512    #[test]
4513    fn dispatch_passes_through_set_metric_detail() {
4514        use crate::proto::command::{MetricDetail, SetMetricDetail};
4515        let mut state = ConfigState::new();
4516        let req: Request = RequestType::SetMetricDetail(SetMetricDetail {
4517            client_id: "test:1".to_owned(),
4518            detail: Some(MetricDetail::DetailBackend as i32),
4519            ttl_seconds: Some(60),
4520            clear: Some(false),
4521            reason: Some("regression-guard".to_owned()),
4522            peer_pid: None,
4523            peer_session_ulid: None,
4524        })
4525        .into();
4526        state
4527            .dispatch(&req)
4528            .expect("SetMetricDetail must traverse dispatch without UndispatchableRequest");
4529    }
4530}