Skip to main content

sozu_command_lib/
state.rs

1use std::{
2    collections::{
3        BTreeMap, BTreeSet, HashMap, HashSet, btree_map::Entry as BTreeMapEntry,
4        hash_map::DefaultHasher,
5    },
6    fs::File,
7    hash::{Hash, Hasher},
8    io::Write,
9    iter::repeat,
10    net::SocketAddr,
11};
12
13use prost::{Message, UnknownEnumValue};
14
15use crate::{
16    ObjectKind,
17    certificate::{CertificateError, Fingerprint, calculate_fingerprint},
18    proto::{
19        command::{
20            ActivateListener, AddBackend, AddCertificate, CertificateAndKey, Cluster,
21            ClusterInformation, CustomHttpAnswers, DeactivateListener, FrontendFilters,
22            HealthChecksList, HttpListenerConfig, HttpsListenerConfig, InitialState,
23            ListedFrontends, ListenerType, ListenersList, PathRule, QueryCertificatesFilters,
24            RemoveBackend, RemoveCertificate, RemoveListener, ReplaceCertificate, Request,
25            RequestCounts, RequestHttpFrontend, RequestTcpFrontend, SetHealthCheck, SocketAddress,
26            TcpListenerConfig, UpdateHttpListenerConfig, UpdateHttpsListenerConfig,
27            UpdateTcpListenerConfig, WorkerRequest, request::RequestType,
28        },
29        display::format_request_type,
30    },
31    response::{Backend, HttpFrontend, TcpFrontend},
32};
33
34/// To use throughout Sōzu
35pub type ClusterId = String;
36
37#[derive(thiserror::Error, Debug)]
38pub enum StateError {
39    #[error("Request came in empty")]
40    EmptyRequest,
41    #[error("dispatching this request did not bring any change to the state")]
42    NoChange,
43    #[error("State can not handle this request")]
44    UndispatchableRequest,
45    #[error("Did not find {kind:?} with address or id '{id}'")]
46    NotFound { kind: ObjectKind, id: String },
47    #[error("{kind:?} '{id}' already exists")]
48    Exists { kind: ObjectKind, id: String },
49    #[error("Wrong field value: {0}")]
50    WrongFieldValue(UnknownEnumValue),
51    #[error("Could not add certificate: {0}")]
52    AddCertificate(CertificateError),
53    #[error("Could not remove certificate: {0}")]
54    RemoveCertificate(String),
55    #[error("Could not replace certificate: {0}")]
56    ReplaceCertificate(String),
57    #[error(
58        "Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
59    )]
60    FrontendConversion { frontend: String, error: String },
61    #[error("Could not write state to file: {0}")]
62    FileError(std::io::Error),
63    #[error("Invalid value for field '{field}': {reason}")]
64    InvalidValue {
65        field: &'static str,
66        reason: &'static str,
67    },
68}
69
70/// The `ConfigState` represents the state of Sōzu's business, which is to forward traffic
71/// from frontends to backends. Hence, it contains all details about:
72///
73/// - listeners (socket addresses, for TCP and HTTP connections)
74/// - frontends (bind to a listener)
75/// - backends (to forward connections to)
76/// - clusters (routing rules from frontends to backends)
77/// - TLS certificates
78#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
79pub struct ConfigState {
80    pub clusters: BTreeMap<ClusterId, Cluster>,
81    pub backends: BTreeMap<ClusterId, Vec<Backend>>,
82    /// socket address -> HTTP listener
83    pub http_listeners: BTreeMap<SocketAddr, HttpListenerConfig>,
84    /// socket address -> HTTPS listener
85    pub https_listeners: BTreeMap<SocketAddr, HttpsListenerConfig>,
86    /// socket address -> TCP listener
87    pub tcp_listeners: BTreeMap<SocketAddr, TcpListenerConfig>,
88    /// HTTP frontends, indexed by a summary of each front's address;hostname;path, for uniqueness.
89    /// For example: `"0.0.0.0:8080;lolcatho.st;P/api"`
90    pub http_fronts: BTreeMap<String, HttpFrontend>,
91    /// indexed by (address, hostname, path)
92    pub https_fronts: BTreeMap<String, HttpFrontend>,
93    pub tcp_fronts: HashMap<ClusterId, Vec<TcpFrontend>>,
94    pub certificates: HashMap<SocketAddr, HashMap<Fingerprint, CertificateAndKey>>,
95    /// A census of requests that were received. Name of the request -> number of occurences
96    pub request_counts: BTreeMap<String, i32>,
97}
98
99impl ConfigState {
100    pub fn new() -> Self {
101        Self::default()
102    }
103
104    pub fn dispatch(&mut self, request: &Request) -> Result<(), StateError> {
105        let request_type = match &request.request_type {
106            Some(t) => t,
107            None => return Err(StateError::EmptyRequest),
108        };
109
110        self.increment_request_count(request);
111
112        match request_type {
113            RequestType::AddCluster(cluster) => self.add_cluster(cluster),
114            RequestType::RemoveCluster(cluster_id) => self.remove_cluster(cluster_id),
115            RequestType::AddHttpListener(listener) => self.add_http_listener(listener),
116            RequestType::AddHttpsListener(listener) => self.add_https_listener(listener),
117            RequestType::AddTcpListener(listener) => self.add_tcp_listener(listener),
118            RequestType::RemoveListener(remove) => self.remove_listener(remove),
119            RequestType::ActivateListener(activate) => self.activate_listener(activate),
120            RequestType::DeactivateListener(deactivate) => self.deactivate_listener(deactivate),
121            RequestType::AddHttpFrontend(front) => self.add_http_frontend(front),
122            RequestType::RemoveHttpFrontend(front) => self.remove_http_frontend(front),
123            RequestType::AddCertificate(add) => self.add_certificate(add),
124            RequestType::RemoveCertificate(remove) => self.remove_certificate(remove),
125            RequestType::ReplaceCertificate(replace) => self.replace_certificate(replace),
126            RequestType::AddHttpsFrontend(front) => self.add_https_frontend(front),
127            RequestType::RemoveHttpsFrontend(front) => self.remove_https_frontend(front),
128            RequestType::AddTcpFrontend(front) => self.add_tcp_frontend(front),
129            RequestType::RemoveTcpFrontend(front) => self.remove_tcp_frontend(front),
130            RequestType::AddBackend(add_backend) => self.add_backend(add_backend),
131            RequestType::RemoveBackend(backend) => self.remove_backend(backend),
132            RequestType::UpdateHttpListener(patch) => self.update_http_listener(patch),
133            RequestType::UpdateHttpsListener(patch) => self.update_https_listener(patch),
134            RequestType::UpdateTcpListener(patch) => self.update_tcp_listener(patch),
135            RequestType::SetHealthCheck(set) => self.set_health_check(set),
136            RequestType::RemoveHealthCheck(cluster_id) => self.remove_health_check(cluster_id),
137
138            // This is to avoid the error message. These request types are
139            // worker-only / runtime-only and do not affect the persisted
140            // ConfigState (e.g., a worker-side global limit set via
141            // SetMaxConnectionsPerIp does NOT survive a worker restart;
142            // operators must mirror the change in the TOML to make it
143            // sticky).
144            RequestType::Logging(_)
145            | RequestType::CountRequests(_)
146            | RequestType::Status(_)
147            | RequestType::SoftStop(_)
148            | RequestType::QueryCertificatesFromWorkers(_)
149            | RequestType::QueryClusterById(_)
150            | RequestType::QueryClustersByDomain(_)
151            | RequestType::QueryMetrics(_)
152            | RequestType::QueryClustersHashes(_)
153            | RequestType::ConfigureMetrics(_)
154            | RequestType::SetMetricDetail(_)
155            | RequestType::ReturnListenSockets(_)
156            | RequestType::SetMaxConnectionsPerIp(_)
157            | RequestType::QueryMaxConnectionsPerIp(_)
158            | RequestType::HardStop(_) => Ok(()),
159
160            _other_request => Err(StateError::UndispatchableRequest),
161        }
162    }
163
164    /// Increments the count for this request type
165    fn increment_request_count(&mut self, request: &Request) {
166        if let Some(request_type) = &request.request_type {
167            let count = self
168                .request_counts
169                .entry(format_request_type(request_type).to_owned())
170                .or_insert(1);
171            *count += 1;
172        }
173    }
174
175    pub fn get_request_counts(&self) -> RequestCounts {
176        RequestCounts {
177            map: self.request_counts.clone(),
178        }
179    }
180
181    fn add_cluster(&mut self, cluster: &Cluster) -> Result<(), StateError> {
182        // Validate any inline `cluster.health_check` before mutating state so
183        // an invalid config (zero thresholds, missing leading `/`, CR/LF/NUL/C0
184        // in URI) cannot ride in via the AddCluster path. Without this, TOML
185        // reload, SaveState/LoadState, and direct API AddCluster requests
186        // bypass the SetHealthCheck-side check and let an attacker-controlled
187        // health-check URI smuggle CRLF into outbound HTTP/1.1 probes.
188        if let Some(hc) = cluster.health_check.as_ref() {
189            if let Err(reason) = crate::config::validate_health_check_config(hc) {
190                return Err(StateError::InvalidValue {
191                    field: "health_check",
192                    reason,
193                });
194            }
195        }
196        let cluster = cluster.clone();
197        self.clusters.insert(cluster.cluster_id.clone(), cluster);
198        Ok(())
199    }
200
201    fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), StateError> {
202        match self.clusters.remove(cluster_id) {
203            Some(_) => Ok(()),
204            None => Err(StateError::NotFound {
205                kind: ObjectKind::Cluster,
206                id: cluster_id.to_owned(),
207            }),
208        }
209    }
210
211    fn set_health_check(&mut self, set: &SetHealthCheck) -> Result<(), StateError> {
212        // Validate before mutating state so an invalid config (zero
213        // thresholds, missing leading `/`, CR/LF/NUL/C0 in URI) cannot
214        // round-trip through SaveState/LoadState. The worker also
215        // validates at the SetHealthCheck handler — this is the
216        // master-side mirror so off-channel TOML reload paths don't
217        // bypass the policy.
218        if let Err(reason) = crate::config::validate_health_check_config(&set.config) {
219            return Err(StateError::InvalidValue {
220                field: "health_check",
221                reason,
222            });
223        }
224        match self.clusters.get_mut(&set.cluster_id) {
225            Some(cluster) => {
226                cluster.health_check = Some(set.config.to_owned());
227                Ok(())
228            }
229            None => Err(StateError::NotFound {
230                kind: ObjectKind::Cluster,
231                id: set.cluster_id.to_owned(),
232            }),
233        }
234    }
235
236    fn remove_health_check(&mut self, cluster_id: &str) -> Result<(), StateError> {
237        match self.clusters.get_mut(cluster_id) {
238            Some(cluster) => {
239                cluster.health_check = None;
240                Ok(())
241            }
242            None => Err(StateError::NotFound {
243                kind: ObjectKind::Cluster,
244                id: cluster_id.to_owned(),
245            }),
246        }
247    }
248
249    pub fn list_health_checks(&self, cluster_id: Option<&str>) -> HealthChecksList {
250        let map = self
251            .clusters
252            .iter()
253            .filter(|(id, _)| cluster_id.is_none_or(|filter| filter == id.as_str()))
254            .filter_map(|(id, cluster)| {
255                cluster
256                    .health_check
257                    .as_ref()
258                    .map(|hc| (id.to_owned(), hc.to_owned()))
259            })
260            .collect();
261        HealthChecksList { map }
262    }
263
264    fn add_http_listener(&mut self, listener: &HttpListenerConfig) -> Result<(), StateError> {
265        let address: SocketAddr = listener.address.into();
266        match self.http_listeners.entry(address) {
267            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
268            BTreeMapEntry::Occupied(_) => {
269                return Err(StateError::Exists {
270                    kind: ObjectKind::HttpListener,
271                    id: address.to_string(),
272                });
273            }
274        };
275        Ok(())
276    }
277
278    fn add_https_listener(&mut self, listener: &HttpsListenerConfig) -> Result<(), StateError> {
279        let address: SocketAddr = listener.address.into();
280        match self.https_listeners.entry(address) {
281            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
282            BTreeMapEntry::Occupied(_) => {
283                return Err(StateError::Exists {
284                    kind: ObjectKind::HttpsListener,
285                    id: address.to_string(),
286                });
287            }
288        };
289        Ok(())
290    }
291
292    fn add_tcp_listener(&mut self, listener: &TcpListenerConfig) -> Result<(), StateError> {
293        let address: SocketAddr = listener.address.into();
294        match self.tcp_listeners.entry(address) {
295            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
296            BTreeMapEntry::Occupied(_) => {
297                return Err(StateError::Exists {
298                    kind: ObjectKind::TcpListener,
299                    id: address.to_string(),
300                });
301            }
302        };
303        Ok(())
304    }
305
306    fn remove_listener(&mut self, remove: &RemoveListener) -> Result<(), StateError> {
307        match ListenerType::try_from(remove.proxy).map_err(StateError::WrongFieldValue)? {
308            ListenerType::Http => self.remove_http_listener(&remove.address.into()),
309            ListenerType::Https => self.remove_https_listener(&remove.address.into()),
310            ListenerType::Tcp => self.remove_tcp_listener(&remove.address.into()),
311        }
312    }
313
314    fn remove_http_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
315        if self.http_listeners.remove(address).is_none() {
316            return Err(StateError::NoChange);
317        }
318        Ok(())
319    }
320
321    fn remove_https_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
322        if self.https_listeners.remove(address).is_none() {
323            return Err(StateError::NoChange);
324        }
325        Ok(())
326    }
327
328    fn remove_tcp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
329        if self.tcp_listeners.remove(address).is_none() {
330            return Err(StateError::NoChange);
331        }
332        Ok(())
333    }
334
335    /// Validate and apply a partial patch to an existing HTTP listener.
336    ///
337    /// Only `Some` fields in the patch are written; `None` fields preserve the
338    /// current value. Returns `StateError::NotFound` if the address is unknown,
339    /// `StateError::InvalidValue` if a flood-knob value is below the required
340    /// minimum.
341    fn update_http_listener(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
342        validate_h2_flood_knobs_http(patch)?;
343
344        let address: SocketAddr = patch.address.into();
345        let listener =
346            self.http_listeners
347                .get_mut(&address)
348                .ok_or_else(|| StateError::NotFound {
349                    kind: ObjectKind::HttpListener,
350                    id: address.to_string(),
351                })?;
352
353        // Shared session-at-accept / per-connection knobs
354        if let Some(v) = patch.public_address {
355            listener.public_address = Some(v);
356        }
357        if let Some(v) = patch.expect_proxy {
358            listener.expect_proxy = v;
359        }
360        if let Some(ref v) = patch.sticky_name {
361            listener.sticky_name = v.to_owned();
362        }
363        if let Some(v) = patch.front_timeout {
364            listener.front_timeout = v;
365        }
366        if let Some(v) = patch.back_timeout {
367            listener.back_timeout = v;
368        }
369        if let Some(v) = patch.connect_timeout {
370            listener.connect_timeout = v;
371        }
372        if let Some(v) = patch.request_timeout {
373            listener.request_timeout = v;
374        }
375        if let Some(patch_answers) = patch.http_answers.as_ref() {
376            merge_custom_http_answers(&mut listener.http_answers, patch_answers);
377        }
378        // H2 flood knobs
379        if let Some(v) = patch.h2_max_rst_stream_per_window {
380            listener.h2_max_rst_stream_per_window = Some(v);
381        }
382        if let Some(v) = patch.h2_max_ping_per_window {
383            listener.h2_max_ping_per_window = Some(v);
384        }
385        if let Some(v) = patch.h2_max_settings_per_window {
386            listener.h2_max_settings_per_window = Some(v);
387        }
388        if let Some(v) = patch.h2_max_empty_data_per_window {
389            listener.h2_max_empty_data_per_window = Some(v);
390        }
391        if let Some(v) = patch.h2_max_continuation_frames {
392            listener.h2_max_continuation_frames = Some(v);
393        }
394        if let Some(v) = patch.h2_max_glitch_count {
395            listener.h2_max_glitch_count = Some(v);
396        }
397        if let Some(v) = patch.h2_initial_connection_window {
398            listener.h2_initial_connection_window = Some(v);
399        }
400        if let Some(v) = patch.h2_max_concurrent_streams {
401            listener.h2_max_concurrent_streams = Some(v);
402        }
403        if let Some(v) = patch.h2_stream_shrink_ratio {
404            listener.h2_stream_shrink_ratio = Some(v);
405        }
406        if let Some(v) = patch.h2_max_rst_stream_lifetime {
407            listener.h2_max_rst_stream_lifetime = Some(v);
408        }
409        if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
410            listener.h2_max_rst_stream_abusive_lifetime = Some(v);
411        }
412        if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
413            listener.h2_max_rst_stream_emitted_lifetime = Some(v);
414        }
415        if let Some(v) = patch.h2_max_header_list_size {
416            listener.h2_max_header_list_size = Some(v);
417        }
418        if let Some(v) = patch.h2_max_header_table_size {
419            listener.h2_max_header_table_size = Some(v);
420        }
421        if let Some(v) = patch.h2_stream_idle_timeout_seconds {
422            listener.h2_stream_idle_timeout_seconds = Some(v);
423        }
424        // 0 is valid for graceful_shutdown_deadline (means "wait forever")
425        if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
426            listener.h2_graceful_shutdown_deadline_seconds = Some(v);
427        }
428        if let Some(v) = patch.h2_max_window_update_stream0_per_window {
429            listener.h2_max_window_update_stream0_per_window = Some(v);
430        }
431        if let Some(ref v) = patch.sozu_id_header {
432            validate_sozu_id_header(v)?;
433            listener.sozu_id_header = Some(v.to_owned());
434        }
435        Ok(())
436    }
437
438    /// Validate and apply a partial patch to an existing HTTPS listener.
439    ///
440    /// Only `Some` fields in the patch are written; `None` fields preserve the
441    /// current value. Returns `StateError::NotFound` if the address is unknown,
442    /// `StateError::InvalidValue` if a flood-knob value is below the required
443    /// minimum or an ALPN value is unknown.
444    fn update_https_listener(
445        &mut self,
446        patch: &UpdateHttpsListenerConfig,
447    ) -> Result<(), StateError> {
448        validate_h2_flood_knobs_https(patch)?;
449
450        let address: SocketAddr = patch.address.into();
451        let listener =
452            self.https_listeners
453                .get_mut(&address)
454                .ok_or_else(|| StateError::NotFound {
455                    kind: ObjectKind::HttpsListener,
456                    id: address.to_string(),
457                })?;
458
459        // Shared session-at-accept / per-connection knobs
460        if let Some(v) = patch.public_address {
461            listener.public_address = Some(v);
462        }
463        if let Some(v) = patch.expect_proxy {
464            listener.expect_proxy = v;
465        }
466        if let Some(ref v) = patch.sticky_name {
467            listener.sticky_name = v.to_owned();
468        }
469        if let Some(v) = patch.front_timeout {
470            listener.front_timeout = v;
471        }
472        if let Some(v) = patch.back_timeout {
473            listener.back_timeout = v;
474        }
475        if let Some(v) = patch.connect_timeout {
476            listener.connect_timeout = v;
477        }
478        if let Some(v) = patch.request_timeout {
479            listener.request_timeout = v;
480        }
481        if let Some(patch_answers) = patch.http_answers.as_ref() {
482            merge_custom_http_answers(&mut listener.http_answers, patch_answers);
483        }
484        // HTTPS-only knobs
485        if let Some(ref alpn_wrapper) = patch.alpn_protocols {
486            validate_alpn_protocols(&alpn_wrapper.values)?;
487            // Empty values vec = reset to default (runtime treats empty as default)
488            listener.alpn_protocols = alpn_wrapper.values.clone();
489        }
490        if let Some(v) = patch.strict_sni_binding {
491            listener.strict_sni_binding = Some(v);
492        }
493        if let Some(v) = patch.disable_http11 {
494            listener.disable_http11 = Some(v);
495        }
496        // H2 flood knobs
497        if let Some(v) = patch.h2_max_rst_stream_per_window {
498            listener.h2_max_rst_stream_per_window = Some(v);
499        }
500        if let Some(v) = patch.h2_max_ping_per_window {
501            listener.h2_max_ping_per_window = Some(v);
502        }
503        if let Some(v) = patch.h2_max_settings_per_window {
504            listener.h2_max_settings_per_window = Some(v);
505        }
506        if let Some(v) = patch.h2_max_empty_data_per_window {
507            listener.h2_max_empty_data_per_window = Some(v);
508        }
509        if let Some(v) = patch.h2_max_continuation_frames {
510            listener.h2_max_continuation_frames = Some(v);
511        }
512        if let Some(v) = patch.h2_max_glitch_count {
513            listener.h2_max_glitch_count = Some(v);
514        }
515        if let Some(v) = patch.h2_initial_connection_window {
516            listener.h2_initial_connection_window = Some(v);
517        }
518        if let Some(v) = patch.h2_max_concurrent_streams {
519            listener.h2_max_concurrent_streams = Some(v);
520        }
521        if let Some(v) = patch.h2_stream_shrink_ratio {
522            listener.h2_stream_shrink_ratio = Some(v);
523        }
524        if let Some(v) = patch.h2_max_rst_stream_lifetime {
525            listener.h2_max_rst_stream_lifetime = Some(v);
526        }
527        if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
528            listener.h2_max_rst_stream_abusive_lifetime = Some(v);
529        }
530        if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
531            listener.h2_max_rst_stream_emitted_lifetime = Some(v);
532        }
533        if let Some(v) = patch.h2_max_header_list_size {
534            listener.h2_max_header_list_size = Some(v);
535        }
536        if let Some(v) = patch.h2_max_header_table_size {
537            listener.h2_max_header_table_size = Some(v);
538        }
539        if let Some(v) = patch.h2_stream_idle_timeout_seconds {
540            listener.h2_stream_idle_timeout_seconds = Some(v);
541        }
542        // 0 is valid for graceful_shutdown_deadline (means "wait forever")
543        if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
544            listener.h2_graceful_shutdown_deadline_seconds = Some(v);
545        }
546        if let Some(v) = patch.h2_max_window_update_stream0_per_window {
547            listener.h2_max_window_update_stream0_per_window = Some(v);
548        }
549        if let Some(ref v) = patch.sozu_id_header {
550            validate_sozu_id_header(v)?;
551            listener.sozu_id_header = Some(v.to_owned());
552        }
553        Ok(())
554    }
555
556    /// Validate and apply a partial patch to an existing TCP listener.
557    ///
558    /// Only `Some` fields in the patch are written; `None` fields preserve the
559    /// current value. Returns `StateError::NotFound` if the address is unknown.
560    fn update_tcp_listener(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), StateError> {
561        let address: SocketAddr = patch.address.into();
562        let listener =
563            self.tcp_listeners
564                .get_mut(&address)
565                .ok_or_else(|| StateError::NotFound {
566                    kind: ObjectKind::TcpListener,
567                    id: address.to_string(),
568                })?;
569
570        if let Some(v) = patch.public_address {
571            listener.public_address = Some(v);
572        }
573        if let Some(v) = patch.expect_proxy {
574            listener.expect_proxy = v;
575        }
576        if let Some(v) = patch.front_timeout {
577            listener.front_timeout = v;
578        }
579        if let Some(v) = patch.back_timeout {
580            listener.back_timeout = v;
581        }
582        if let Some(v) = patch.connect_timeout {
583            listener.connect_timeout = v;
584        }
585        Ok(())
586    }
587
588    fn activate_listener(&mut self, activate: &ActivateListener) -> Result<(), StateError> {
589        match ListenerType::try_from(activate.proxy).map_err(StateError::WrongFieldValue)? {
590            ListenerType::Http => self
591                .http_listeners
592                .get_mut(&activate.address.into())
593                .map(|listener| listener.active = true)
594                .ok_or(StateError::NotFound {
595                    kind: ObjectKind::HttpListener,
596                    id: activate.address.to_string(),
597                }),
598            ListenerType::Https => self
599                .https_listeners
600                .get_mut(&activate.address.into())
601                .map(|listener| listener.active = true)
602                .ok_or(StateError::NotFound {
603                    kind: ObjectKind::HttpsListener,
604                    id: activate.address.to_string(),
605                }),
606            ListenerType::Tcp => self
607                .tcp_listeners
608                .get_mut(&activate.address.into())
609                .map(|listener| listener.active = true)
610                .ok_or(StateError::NotFound {
611                    kind: ObjectKind::TcpListener,
612                    id: activate.address.to_string(),
613                }),
614        }
615    }
616
617    fn deactivate_listener(&mut self, deactivate: &DeactivateListener) -> Result<(), StateError> {
618        match ListenerType::try_from(deactivate.proxy).map_err(StateError::WrongFieldValue)? {
619            ListenerType::Http => self
620                .http_listeners
621                .get_mut(&deactivate.address.into())
622                .map(|listener| listener.active = false)
623                .ok_or(StateError::NotFound {
624                    kind: ObjectKind::HttpListener,
625                    id: deactivate.address.to_string(),
626                }),
627            ListenerType::Https => self
628                .https_listeners
629                .get_mut(&deactivate.address.into())
630                .map(|listener| listener.active = false)
631                .ok_or(StateError::NotFound {
632                    kind: ObjectKind::HttpsListener,
633                    id: deactivate.address.to_string(),
634                }),
635            ListenerType::Tcp => self
636                .tcp_listeners
637                .get_mut(&deactivate.address.into())
638                .map(|listener| listener.active = false)
639                .ok_or(StateError::NotFound {
640                    kind: ObjectKind::TcpListener,
641                    id: deactivate.address.to_string(),
642                }),
643        }
644    }
645
646    fn add_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
647        let front_as_key = front.to_string();
648
649        match self.http_fronts.entry(front.to_string()) {
650            BTreeMapEntry::Vacant(e) => {
651                e.insert(front.clone().to_frontend().map_err(|into_error| {
652                    StateError::FrontendConversion {
653                        frontend: front_as_key,
654                        error: into_error.to_string(),
655                    }
656                })?)
657            }
658            BTreeMapEntry::Occupied(_) => {
659                return Err(StateError::Exists {
660                    kind: ObjectKind::HttpFrontend,
661                    id: front.to_string(),
662                });
663            }
664        };
665        Ok(())
666    }
667
668    fn add_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
669        let front_as_key = front.to_string();
670
671        match self.https_fronts.entry(front.to_string()) {
672            BTreeMapEntry::Vacant(e) => {
673                e.insert(front.clone().to_frontend().map_err(|into_error| {
674                    StateError::FrontendConversion {
675                        frontend: front_as_key,
676                        error: into_error.to_string(),
677                    }
678                })?)
679            }
680            BTreeMapEntry::Occupied(_) => {
681                return Err(StateError::Exists {
682                    kind: ObjectKind::HttpsFrontend,
683                    id: front.to_string(),
684                });
685            }
686        };
687        Ok(())
688    }
689
690    fn remove_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
691        self.http_fronts
692            .remove(&front.to_string())
693            .ok_or(StateError::NotFound {
694                kind: ObjectKind::HttpFrontend,
695                id: front.to_string(),
696            })?;
697        Ok(())
698    }
699
700    fn remove_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
701        self.https_fronts
702            .remove(&front.to_string())
703            .ok_or(StateError::NotFound {
704                kind: ObjectKind::HttpsFrontend,
705                id: front.to_string(),
706            })?;
707        Ok(())
708    }
709
710    fn add_certificate(&mut self, add: &AddCertificate) -> Result<(), StateError> {
711        let fingerprint = add
712            .certificate
713            .fingerprint()
714            .map_err(StateError::AddCertificate)?;
715
716        let entry = self.certificates.entry(add.address.into()).or_default();
717
718        let mut add = add.clone();
719        add.certificate
720            .apply_overriding_names()
721            .map_err(StateError::AddCertificate)?;
722
723        if entry.contains_key(&fingerprint) {
724            info!(
725                "Skip loading of certificate '{}' for domain '{}' on listener '{}', the certificate is already present.",
726                fingerprint,
727                add.certificate.names.join(", "),
728                add.address
729            );
730            return Ok(());
731        }
732
733        entry.insert(fingerprint, add.certificate);
734        Ok(())
735    }
736
737    fn remove_certificate(&mut self, remove: &RemoveCertificate) -> Result<(), StateError> {
738        let fingerprint = Fingerprint(
739            hex::decode(&remove.fingerprint)
740                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
741        );
742
743        if let Some(index) = self.certificates.get_mut(&remove.address.into()) {
744            index.remove(&fingerprint);
745        }
746
747        Ok(())
748    }
749
750    /// - Remove old certificate from certificates, using the old fingerprint
751    /// - calculate the new fingerprint
752    /// - insert the new certificate with the new fingerprint as key
753    /// - check that the new entry is present in the certificates hashmap
754    fn replace_certificate(&mut self, replace: &ReplaceCertificate) -> Result<(), StateError> {
755        let replace_address = replace.address.into();
756        let old_fingerprint = Fingerprint(
757            hex::decode(&replace.old_fingerprint)
758                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
759        );
760
761        self.certificates
762            .get_mut(&replace_address)
763            .ok_or(StateError::NotFound {
764                kind: ObjectKind::Certificate,
765                id: replace.address.to_string(),
766            })?
767            .remove(&old_fingerprint);
768
769        let new_fingerprint = Fingerprint(
770            calculate_fingerprint(replace.new_certificate.certificate.as_bytes()).map_err(
771                |fingerprint_err| StateError::ReplaceCertificate(fingerprint_err.to_string()),
772            )?,
773        );
774
775        self.certificates
776            .get_mut(&replace_address)
777            .map(|certs| certs.insert(new_fingerprint.clone(), replace.new_certificate.clone()));
778
779        if !self
780            .certificates
781            .get(&replace_address)
782            .ok_or(StateError::ReplaceCertificate(
783                "Unlikely error. This entry in the certificate hashmap should be present"
784                    .to_string(),
785            ))?
786            .contains_key(&new_fingerprint)
787        {
788            return Err(StateError::ReplaceCertificate(format!(
789                "Failed to insert the new certificate for address {}",
790                replace.address
791            )));
792        }
793        Ok(())
794    }
795
796    fn add_tcp_frontend(&mut self, front: &RequestTcpFrontend) -> Result<(), StateError> {
797        let tcp_frontends = self.tcp_fronts.entry(front.cluster_id.clone()).or_default();
798
799        let tcp_frontend = TcpFrontend {
800            cluster_id: front.cluster_id.clone(),
801            address: front.address.into(),
802            tags: front.tags.clone(),
803        };
804        if tcp_frontends.contains(&tcp_frontend) {
805            return Err(StateError::Exists {
806                kind: ObjectKind::TcpFrontend,
807                id: format!("{tcp_frontend:?}"),
808            });
809        }
810
811        tcp_frontends.push(tcp_frontend);
812        Ok(())
813    }
814
815    fn remove_tcp_frontend(
816        &mut self,
817        front_to_remove: &RequestTcpFrontend,
818    ) -> Result<(), StateError> {
819        let tcp_frontends =
820            self.tcp_fronts
821                .get_mut(&front_to_remove.cluster_id)
822                .ok_or(StateError::NotFound {
823                    kind: ObjectKind::TcpFrontend,
824                    id: format!("{front_to_remove:?}"),
825                })?;
826
827        let len = tcp_frontends.len();
828        tcp_frontends.retain(|front| front.address != front_to_remove.address.into());
829        if tcp_frontends.len() == len {
830            return Err(StateError::NoChange);
831        }
832        Ok(())
833    }
834
835    fn add_backend(&mut self, add_backend: &AddBackend) -> Result<(), StateError> {
836        let backend = Backend {
837            address: add_backend.address.into(),
838            cluster_id: add_backend.cluster_id.clone(),
839            backend_id: add_backend.backend_id.clone(),
840            sticky_id: add_backend.sticky_id.clone(),
841            load_balancing_parameters: add_backend.load_balancing_parameters,
842            backup: add_backend.backup,
843        };
844        let backends = self.backends.entry(backend.cluster_id.clone()).or_default();
845
846        // we might be modifying the sticky id or load balancing parameters
847        backends.retain(|b| b.backend_id != backend.backend_id || b.address != backend.address);
848        backends.push(backend);
849        backends.sort();
850
851        Ok(())
852    }
853
854    fn remove_backend(&mut self, backend: &RemoveBackend) -> Result<(), StateError> {
855        let backend_list =
856            self.backends
857                .get_mut(&backend.cluster_id)
858                .ok_or(StateError::NotFound {
859                    kind: ObjectKind::Backend,
860                    id: backend.backend_id.to_owned(),
861                })?;
862
863        let len = backend_list.len();
864        let remove_address = backend.address.into();
865        backend_list.retain(|b| b.backend_id != backend.backend_id || b.address != remove_address);
866        backend_list.sort();
867        if backend_list.len() == len {
868            return Err(StateError::NoChange);
869        }
870        Ok(())
871    }
872
873    /// creates all requests needed to bootstrap the state
874    fn generate_requests(&self) -> Vec<Request> {
875        let mut v: Vec<Request> = Vec::new();
876
877        for listener in self.http_listeners.values() {
878            v.push(RequestType::AddHttpListener(listener.clone()).into());
879            if listener.active {
880                v.push(
881                    RequestType::ActivateListener(ActivateListener {
882                        address: listener.address,
883                        proxy: ListenerType::Http.into(),
884                        from_scm: false,
885                    })
886                    .into(),
887                );
888            }
889        }
890
891        for listener in self.https_listeners.values() {
892            v.push(RequestType::AddHttpsListener(listener.clone()).into());
893            if listener.active {
894                v.push(
895                    RequestType::ActivateListener(ActivateListener {
896                        address: listener.address,
897                        proxy: ListenerType::Https.into(),
898                        from_scm: false,
899                    })
900                    .into(),
901                );
902            }
903        }
904
905        for listener in self.tcp_listeners.values() {
906            v.push(RequestType::AddTcpListener(*listener).into());
907            if listener.active {
908                v.push(
909                    RequestType::ActivateListener(ActivateListener {
910                        address: listener.address,
911                        proxy: ListenerType::Tcp.into(),
912                        from_scm: false,
913                    })
914                    .into(),
915                );
916            }
917        }
918
919        for cluster in self.clusters.values() {
920            v.push(RequestType::AddCluster(cluster.clone()).into());
921        }
922
923        for front in self.http_fronts.values() {
924            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
925        }
926
927        for (front, certs) in self.certificates.iter() {
928            for certificate_and_key in certs.values() {
929                v.push(
930                    RequestType::AddCertificate(AddCertificate {
931                        address: SocketAddress::from(*front),
932                        certificate: certificate_and_key.clone(),
933                        expired_at: None,
934                    })
935                    .into(),
936                );
937            }
938        }
939
940        for front in self.https_fronts.values() {
941            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
942        }
943
944        for front_list in self.tcp_fronts.values() {
945            for front in front_list {
946                v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
947            }
948        }
949
950        for backend_list in self.backends.values() {
951            for backend in backend_list {
952                v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
953            }
954        }
955
956        v
957    }
958
959    pub fn generate_activate_requests(&self) -> Vec<Request> {
960        let mut v: Vec<Request> = Vec::new();
961        for front in self
962            .http_listeners
963            .iter()
964            .filter(|(_, listener)| listener.active)
965            .map(|(k, _)| k)
966        {
967            v.push(
968                RequestType::ActivateListener(ActivateListener {
969                    address: SocketAddress::from(*front),
970                    proxy: ListenerType::Http.into(),
971                    from_scm: false,
972                })
973                .into(),
974            );
975        }
976
977        for front in self
978            .https_listeners
979            .iter()
980            .filter(|(_, listener)| listener.active)
981            .map(|(k, _)| k)
982        {
983            v.push(
984                RequestType::ActivateListener(ActivateListener {
985                    address: SocketAddress::from(*front),
986                    proxy: ListenerType::Https.into(),
987                    from_scm: false,
988                })
989                .into(),
990            );
991        }
992        for front in self
993            .tcp_listeners
994            .iter()
995            .filter(|(_, listener)| listener.active)
996            .map(|(k, _)| k)
997        {
998            v.push(
999                RequestType::ActivateListener(ActivateListener {
1000                    address: SocketAddress::from(*front),
1001                    proxy: ListenerType::Tcp.into(),
1002                    from_scm: false,
1003                })
1004                .into(),
1005            );
1006        }
1007
1008        v
1009    }
1010
1011    pub fn diff(&self, other: &ConfigState) -> Vec<Request> {
1012        //pub tcp_listeners:   HashMap<SocketAddr, (TcpListener, bool)>,
1013        let my_tcp_listeners: HashSet<&SocketAddr> = self.tcp_listeners.keys().collect();
1014        let their_tcp_listeners: HashSet<&SocketAddr> = other.tcp_listeners.keys().collect();
1015        let removed_tcp_listeners = my_tcp_listeners.difference(&their_tcp_listeners);
1016        let added_tcp_listeners = their_tcp_listeners.difference(&my_tcp_listeners);
1017
1018        let my_http_listeners: HashSet<&SocketAddr> = self.http_listeners.keys().collect();
1019        let their_http_listeners: HashSet<&SocketAddr> = other.http_listeners.keys().collect();
1020        let removed_http_listeners = my_http_listeners.difference(&their_http_listeners);
1021        let added_http_listeners = their_http_listeners.difference(&my_http_listeners);
1022
1023        let my_https_listeners: HashSet<&SocketAddr> = self.https_listeners.keys().collect();
1024        let their_https_listeners: HashSet<&SocketAddr> = other.https_listeners.keys().collect();
1025        let removed_https_listeners = my_https_listeners.difference(&their_https_listeners);
1026        let added_https_listeners = their_https_listeners.difference(&my_https_listeners);
1027
1028        let mut v: Vec<Request> = vec![];
1029
1030        for address in removed_tcp_listeners {
1031            if self.tcp_listeners[*address].active {
1032                v.push(
1033                    RequestType::DeactivateListener(DeactivateListener {
1034                        address: SocketAddress::from(**address),
1035                        proxy: ListenerType::Tcp.into(),
1036                        to_scm: false,
1037                    })
1038                    .into(),
1039                );
1040            }
1041
1042            v.push(
1043                RequestType::RemoveListener(RemoveListener {
1044                    address: SocketAddress::from(**address),
1045                    proxy: ListenerType::Tcp.into(),
1046                })
1047                .into(),
1048            );
1049        }
1050
1051        for address in added_tcp_listeners.clone() {
1052            v.push(RequestType::AddTcpListener(other.tcp_listeners[*address]).into());
1053
1054            if other.tcp_listeners[*address].active {
1055                v.push(
1056                    RequestType::ActivateListener(ActivateListener {
1057                        address: SocketAddress::from(**address),
1058                        proxy: ListenerType::Tcp.into(),
1059                        from_scm: false,
1060                    })
1061                    .into(),
1062                );
1063            }
1064        }
1065
1066        for address in removed_http_listeners {
1067            if self.http_listeners[*address].active {
1068                v.push(
1069                    RequestType::DeactivateListener(DeactivateListener {
1070                        address: SocketAddress::from(**address),
1071                        proxy: ListenerType::Http.into(),
1072                        to_scm: false,
1073                    })
1074                    .into(),
1075                );
1076            }
1077
1078            v.push(
1079                RequestType::RemoveListener(RemoveListener {
1080                    address: SocketAddress::from(**address),
1081                    proxy: ListenerType::Http.into(),
1082                })
1083                .into(),
1084            );
1085        }
1086
1087        for address in added_http_listeners.clone() {
1088            v.push(RequestType::AddHttpListener(other.http_listeners[*address].clone()).into());
1089
1090            if other.http_listeners[*address].active {
1091                v.push(
1092                    RequestType::ActivateListener(ActivateListener {
1093                        address: SocketAddress::from(**address),
1094                        proxy: ListenerType::Http.into(),
1095                        from_scm: false,
1096                    })
1097                    .into(),
1098                );
1099            }
1100        }
1101
1102        for address in removed_https_listeners {
1103            if self.https_listeners[*address].active {
1104                v.push(
1105                    RequestType::DeactivateListener(DeactivateListener {
1106                        address: SocketAddress::from(**address),
1107                        proxy: ListenerType::Https.into(),
1108                        to_scm: false,
1109                    })
1110                    .into(),
1111                );
1112            }
1113
1114            v.push(
1115                RequestType::RemoveListener(RemoveListener {
1116                    address: SocketAddress::from(**address),
1117                    proxy: ListenerType::Https.into(),
1118                })
1119                .into(),
1120            );
1121        }
1122
1123        for address in added_https_listeners.clone() {
1124            v.push(RequestType::AddHttpsListener(other.https_listeners[*address].clone()).into());
1125
1126            if other.https_listeners[*address].active {
1127                v.push(
1128                    RequestType::ActivateListener(ActivateListener {
1129                        address: SocketAddress::from(**address),
1130                        proxy: ListenerType::Https.into(),
1131                        from_scm: false,
1132                    })
1133                    .into(),
1134                );
1135            }
1136        }
1137
1138        for addr in my_tcp_listeners.intersection(&their_tcp_listeners) {
1139            let my_listener = &self.tcp_listeners[*addr];
1140            let their_listener = &other.tcp_listeners[*addr];
1141
1142            if my_listener != their_listener {
1143                v.push(
1144                    RequestType::RemoveListener(RemoveListener {
1145                        address: SocketAddress::from(**addr),
1146                        proxy: ListenerType::Tcp.into(),
1147                    })
1148                    .into(),
1149                );
1150                // any added listener should be unactive
1151                let mut listener_to_add = *their_listener;
1152                listener_to_add.active = false;
1153                v.push(RequestType::AddTcpListener(listener_to_add).into());
1154            }
1155
1156            if my_listener.active && !their_listener.active {
1157                v.push(
1158                    RequestType::DeactivateListener(DeactivateListener {
1159                        address: SocketAddress::from(**addr),
1160                        proxy: ListenerType::Tcp.into(),
1161                        to_scm: false,
1162                    })
1163                    .into(),
1164                );
1165            }
1166
1167            if !my_listener.active && their_listener.active {
1168                v.push(
1169                    RequestType::ActivateListener(ActivateListener {
1170                        address: SocketAddress::from(**addr),
1171                        proxy: ListenerType::Tcp.into(),
1172                        from_scm: false,
1173                    })
1174                    .into(),
1175                );
1176            }
1177        }
1178
1179        for addr in my_http_listeners.intersection(&their_http_listeners) {
1180            let my_listener = &self.http_listeners[*addr];
1181            let their_listener = &other.http_listeners[*addr];
1182
1183            if my_listener != their_listener {
1184                v.push(
1185                    RequestType::RemoveListener(RemoveListener {
1186                        address: SocketAddress::from(**addr),
1187                        proxy: ListenerType::Http.into(),
1188                    })
1189                    .into(),
1190                );
1191                // any added listener should be unactive
1192                let mut listener_to_add = their_listener.clone();
1193                listener_to_add.active = false;
1194                v.push(RequestType::AddHttpListener(listener_to_add).into());
1195            }
1196
1197            if my_listener.active && !their_listener.active {
1198                v.push(
1199                    RequestType::DeactivateListener(DeactivateListener {
1200                        address: SocketAddress::from(**addr),
1201                        proxy: ListenerType::Http.into(),
1202                        to_scm: false,
1203                    })
1204                    .into(),
1205                );
1206            }
1207
1208            if !my_listener.active && their_listener.active {
1209                v.push(
1210                    RequestType::ActivateListener(ActivateListener {
1211                        address: SocketAddress::from(**addr),
1212                        proxy: ListenerType::Http.into(),
1213                        from_scm: false,
1214                    })
1215                    .into(),
1216                );
1217            }
1218        }
1219
1220        for addr in my_https_listeners.intersection(&their_https_listeners) {
1221            let my_listener = &self.https_listeners[*addr];
1222            let their_listener = &other.https_listeners[*addr];
1223
1224            if my_listener != their_listener {
1225                v.push(
1226                    RequestType::RemoveListener(RemoveListener {
1227                        address: SocketAddress::from(**addr),
1228                        proxy: ListenerType::Https.into(),
1229                    })
1230                    .into(),
1231                );
1232                // any added listener should be unactive
1233                let mut listener_to_add = their_listener.clone();
1234                listener_to_add.active = false;
1235                v.push(RequestType::AddHttpsListener(listener_to_add).into());
1236            }
1237
1238            if my_listener.active && !their_listener.active {
1239                v.push(
1240                    RequestType::DeactivateListener(DeactivateListener {
1241                        address: SocketAddress::from(**addr),
1242                        proxy: ListenerType::Https.into(),
1243                        to_scm: false,
1244                    })
1245                    .into(),
1246                );
1247            }
1248
1249            if !my_listener.active && their_listener.active {
1250                v.push(
1251                    RequestType::ActivateListener(ActivateListener {
1252                        address: SocketAddress::from(**addr),
1253                        proxy: ListenerType::Https.into(),
1254                        from_scm: false,
1255                    })
1256                    .into(),
1257                );
1258            }
1259        }
1260
1261        for (cluster_id, res) in diff_map(self.clusters.iter(), other.clusters.iter()) {
1262            match res {
1263                DiffResult::Added | DiffResult::Changed => v.push(
1264                    RequestType::AddCluster(other.clusters.get(cluster_id).unwrap().clone()).into(),
1265                ),
1266                DiffResult::Removed => {
1267                    v.push(RequestType::RemoveCluster(cluster_id.to_string()).into())
1268                }
1269            }
1270        }
1271
1272        for ((cluster_id, backend_id), res) in diff_map(
1273            self.backends.iter().flat_map(|(cluster_id, v)| {
1274                v.iter()
1275                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1276            }),
1277            other.backends.iter().flat_map(|(cluster_id, v)| {
1278                v.iter()
1279                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1280            }),
1281        ) {
1282            match res {
1283                DiffResult::Added => {
1284                    let backend = other
1285                        .backends
1286                        .get(cluster_id)
1287                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1288                        .unwrap();
1289                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
1290                }
1291                DiffResult::Removed => {
1292                    let backend = self
1293                        .backends
1294                        .get(cluster_id)
1295                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1296                        .unwrap();
1297
1298                    v.push(
1299                        RequestType::RemoveBackend(RemoveBackend {
1300                            cluster_id: backend.cluster_id.clone(),
1301                            backend_id: backend.backend_id.clone(),
1302                            address: SocketAddress::from(backend.address),
1303                        })
1304                        .into(),
1305                    );
1306                }
1307                DiffResult::Changed => {
1308                    let backend = self
1309                        .backends
1310                        .get(cluster_id)
1311                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1312                        .unwrap();
1313
1314                    v.push(
1315                        RequestType::RemoveBackend(RemoveBackend {
1316                            cluster_id: backend.cluster_id.clone(),
1317                            backend_id: backend.backend_id.clone(),
1318                            address: SocketAddress::from(backend.address),
1319                        })
1320                        .into(),
1321                    );
1322
1323                    let backend = other
1324                        .backends
1325                        .get(cluster_id)
1326                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1327                        .unwrap();
1328                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
1329                }
1330            }
1331        }
1332
1333        let mut my_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
1334        for (route, front) in self.http_fronts.iter() {
1335            my_http_fronts.insert((route, front));
1336        }
1337        let mut their_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
1338        for (route, front) in other.http_fronts.iter() {
1339            their_http_fronts.insert((route, front));
1340        }
1341
1342        let removed_http_fronts = my_http_fronts.difference(&their_http_fronts);
1343        let added_http_fronts = their_http_fronts.difference(&my_http_fronts);
1344
1345        for &(_, front) in removed_http_fronts {
1346            v.push(RequestType::RemoveHttpFrontend(front.clone().into()).into());
1347        }
1348
1349        for &(_, front) in added_http_fronts {
1350            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
1351        }
1352
1353        let mut my_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1354        for (route, front) in self.https_fronts.iter() {
1355            my_https_fronts.insert((route, front));
1356        }
1357        let mut their_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1358        for (route, front) in other.https_fronts.iter() {
1359            their_https_fronts.insert((route, front));
1360        }
1361        let removed_https_fronts = my_https_fronts.difference(&their_https_fronts);
1362        let added_https_fronts = their_https_fronts.difference(&my_https_fronts);
1363
1364        for &(_, front) in removed_https_fronts {
1365            v.push(RequestType::RemoveHttpsFrontend(front.clone().into()).into());
1366        }
1367
1368        for &(_, front) in added_https_fronts {
1369            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
1370        }
1371
1372        let mut my_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1373        for (cluster_id, front_list) in self.tcp_fronts.iter() {
1374            for front in front_list.iter() {
1375                my_tcp_fronts.insert((cluster_id, front));
1376            }
1377        }
1378        let mut their_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1379        for (cluster_id, front_list) in other.tcp_fronts.iter() {
1380            for front in front_list.iter() {
1381                their_tcp_fronts.insert((cluster_id, front));
1382            }
1383        }
1384
1385        let removed_tcp_fronts = my_tcp_fronts.difference(&their_tcp_fronts);
1386        let added_tcp_fronts = their_tcp_fronts.difference(&my_tcp_fronts);
1387
1388        for &(_, front) in removed_tcp_fronts {
1389            v.push(RequestType::RemoveTcpFrontend(front.clone().into()).into());
1390        }
1391
1392        for &(_, front) in added_tcp_fronts {
1393            v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
1394        }
1395
1396        //pub certificates:    HashMap<SocketAddr, HashMap<CertificateFingerprint, (CertificateAndKey, Vec<String>)>>,
1397        let my_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1398            self.certificates
1399                .iter()
1400                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1401        );
1402        let their_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1403            other
1404                .certificates
1405                .iter()
1406                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1407        );
1408
1409        let removed_certificates = my_certificates.difference(&their_certificates);
1410        let added_certificates = their_certificates.difference(&my_certificates);
1411
1412        for &(address, fingerprint) in removed_certificates {
1413            v.push(
1414                RequestType::RemoveCertificate(RemoveCertificate {
1415                    address: SocketAddress::from(address),
1416                    fingerprint: fingerprint.to_string(),
1417                })
1418                .into(),
1419            );
1420        }
1421
1422        for &(address, fingerprint) in added_certificates {
1423            if let Some(certificate_and_key) = other
1424                .certificates
1425                .get(&address)
1426                .and_then(|certs| certs.get(fingerprint))
1427            {
1428                v.push(
1429                    RequestType::AddCertificate(AddCertificate {
1430                        address: SocketAddress::from(address),
1431                        certificate: certificate_and_key.clone(),
1432                        expired_at: None,
1433                    })
1434                    .into(),
1435                );
1436            }
1437        }
1438
1439        for address in added_tcp_listeners {
1440            let listener = &other.tcp_listeners[*address];
1441            if listener.active {
1442                v.push(
1443                    RequestType::ActivateListener(ActivateListener {
1444                        address: listener.address,
1445                        proxy: ListenerType::Tcp.into(),
1446                        from_scm: false,
1447                    })
1448                    .into(),
1449                );
1450            }
1451        }
1452
1453        v
1454    }
1455
1456    // FIXME: what about deny rules?
1457    pub fn hash_state(&self) -> BTreeMap<ClusterId, u64> {
1458        let mut hm: HashMap<ClusterId, DefaultHasher> = self
1459            .clusters
1460            .keys()
1461            .map(|cluster_id| {
1462                let mut hasher = DefaultHasher::new();
1463                self.clusters.get(cluster_id).hash(&mut hasher);
1464                if let Some(backends) = self.backends.get(cluster_id) {
1465                    backends.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1466                }
1467                if let Some(tcp_fronts) = self.tcp_fronts.get(cluster_id) {
1468                    tcp_fronts.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1469                }
1470                (cluster_id.to_owned(), hasher)
1471            })
1472            .collect();
1473
1474        for front in self.http_fronts.values() {
1475            if let Some(cluster_id) = &front.cluster_id {
1476                if let Some(hasher) = hm.get_mut(cluster_id) {
1477                    front.hash(hasher);
1478                }
1479            }
1480        }
1481
1482        for front in self.https_fronts.values() {
1483            if let Some(cluster_id) = &front.cluster_id {
1484                if let Some(hasher) = hm.get_mut(cluster_id) {
1485                    front.hash(hasher);
1486                }
1487            }
1488        }
1489
1490        hm.drain()
1491            .map(|(cluster_id, hasher)| (cluster_id, hasher.finish()))
1492            .collect()
1493    }
1494
1495    /// Gives details about a given cluster.
1496    /// Types like `HttpFrontend` are converted into protobuf ones, like `RequestHttpFrontend`
1497    pub fn cluster_state(&self, cluster_id: &str) -> Option<ClusterInformation> {
1498        let configuration = self.clusters.get(cluster_id).cloned()?;
1499        info!("{:?}", configuration);
1500
1501        let http_frontends: Vec<RequestHttpFrontend> = self
1502            .http_fronts
1503            .values()
1504            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1505            .map(|front| front.clone().into())
1506            .collect();
1507
1508        let https_frontends: Vec<RequestHttpFrontend> = self
1509            .https_fronts
1510            .values()
1511            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1512            .map(|front| front.clone().into())
1513            .collect();
1514
1515        let tcp_frontends: Vec<RequestTcpFrontend> = self
1516            .tcp_fronts
1517            .get(cluster_id)
1518            .cloned()
1519            .unwrap_or_default()
1520            .iter()
1521            .map(|front| front.clone().into())
1522            .collect();
1523
1524        let backends: Vec<AddBackend> = self
1525            .backends
1526            .get(cluster_id)
1527            .cloned()
1528            .unwrap_or_default()
1529            .iter()
1530            .map(|backend| backend.clone().into())
1531            .collect();
1532
1533        Some(ClusterInformation {
1534            configuration: Some(configuration),
1535            http_frontends,
1536            https_frontends,
1537            tcp_frontends,
1538            backends,
1539        })
1540    }
1541
1542    pub fn count_backends(&self) -> usize {
1543        self.backends.values().fold(0, |acc, v| acc + v.len())
1544    }
1545
1546    pub fn count_frontends(&self) -> usize {
1547        self.http_fronts.values().count()
1548            + self.https_fronts.values().count()
1549            + self.tcp_fronts.values().fold(0, |acc, v| acc + v.len())
1550    }
1551
1552    pub fn get_cluster_ids_by_domain(
1553        &self,
1554        hostname: String,
1555        path: Option<String>,
1556    ) -> HashSet<ClusterId> {
1557        let mut cluster_ids: HashSet<ClusterId> = HashSet::new();
1558
1559        self.http_fronts.values().for_each(|front| {
1560            if domain_check(&front.hostname, &front.path, &hostname, &path) {
1561                if let Some(id) = &front.cluster_id {
1562                    cluster_ids.insert(id.to_string());
1563                }
1564            }
1565        });
1566
1567        self.https_fronts.values().for_each(|front| {
1568            if domain_check(&front.hostname, &front.path, &hostname, &path) {
1569                if let Some(id) = &front.cluster_id {
1570                    cluster_ids.insert(id.to_string());
1571                }
1572            }
1573        });
1574
1575        cluster_ids
1576    }
1577
1578    pub fn get_certificates(
1579        &self,
1580        filters: QueryCertificatesFilters,
1581    ) -> BTreeMap<String, CertificateAndKey> {
1582        self.certificates
1583            .values()
1584            .flat_map(|hash_map| hash_map.iter())
1585            .filter(|(fingerprint, cert)| {
1586                if let Some(domain) = &filters.domain {
1587                    cert.names.contains(domain)
1588                } else if let Some(f) = &filters.fingerprint {
1589                    fingerprint.to_string() == *f
1590                } else {
1591                    true
1592                }
1593            })
1594            .map(|(fingerprint, cert)| (fingerprint.to_string(), cert.to_owned()))
1595            .collect()
1596    }
1597
1598    pub fn list_frontends(&self, filters: FrontendFilters) -> ListedFrontends {
1599        // if no http / https / tcp filter is provided, list all of them
1600        let list_all = !filters.http && !filters.https && !filters.tcp;
1601
1602        let mut listed_frontends = ListedFrontends::default();
1603
1604        if filters.http || list_all {
1605            for http_frontend in self.http_fronts.iter().filter(|f| {
1606                if let Some(domain) = &filters.domain {
1607                    f.1.hostname.contains(domain)
1608                } else {
1609                    true
1610                }
1611            }) {
1612                listed_frontends
1613                    .http_frontends
1614                    .push(http_frontend.1.to_owned().into());
1615            }
1616        }
1617
1618        if filters.https || list_all {
1619            for https_frontend in self.https_fronts.iter().filter(|f| {
1620                if let Some(domain) = &filters.domain {
1621                    f.1.hostname.contains(domain)
1622                } else {
1623                    true
1624                }
1625            }) {
1626                listed_frontends
1627                    .https_frontends
1628                    .push(https_frontend.1.to_owned().into());
1629            }
1630        }
1631
1632        if (filters.tcp || list_all) && filters.domain.is_none() {
1633            for tcp_frontend in self.tcp_fronts.values().flat_map(|v| v.iter()) {
1634                listed_frontends
1635                    .tcp_frontends
1636                    .push(tcp_frontend.to_owned().into())
1637            }
1638        }
1639
1640        listed_frontends
1641    }
1642
1643    pub fn list_listeners(&self) -> ListenersList {
1644        ListenersList {
1645            http_listeners: self
1646                .http_listeners
1647                .iter()
1648                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1649                .collect(),
1650            https_listeners: self
1651                .https_listeners
1652                .iter()
1653                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1654                .collect(),
1655            tcp_listeners: self
1656                .tcp_listeners
1657                .iter()
1658                .map(|(addr, listener)| (addr.to_string(), *listener))
1659                .collect(),
1660        }
1661    }
1662
1663    // create requests needed for a worker to recreate the state
1664    pub fn produce_initial_state(&self) -> InitialState {
1665        let mut worker_requests = Vec::new();
1666        for (counter, request) in self.generate_requests().into_iter().enumerate() {
1667            worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
1668        }
1669        InitialState {
1670            requests: worker_requests,
1671        }
1672    }
1673
1674    /// generate requests necessary to recreate the state,
1675    /// in protobuf, to a temp file
1676    pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1677        let initial_state = self.produce_initial_state();
1678        let count = initial_state.requests.len();
1679
1680        let bytes_to_write = initial_state.encode_to_vec();
1681        println!("writing {} in the temp file", bytes_to_write.len());
1682        file.write_all(&bytes_to_write)
1683            .map_err(StateError::FileError)?;
1684
1685        file.sync_all().map_err(StateError::FileError)?;
1686
1687        Ok(count)
1688    }
1689
1690    /// generate requests necessary to recreate the state,
1691    /// write them in a JSON form in a file, separated by \n\0,
1692    /// returns the number of written requests
1693    pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1694        let mut counter = 0usize;
1695        let requests = self.generate_requests();
1696
1697        for request in requests {
1698            let message = WorkerRequest::new(format!("SAVE-{counter}"), request);
1699
1700            file.write_all(
1701                &serde_json::to_string(&message)
1702                    .map(|s| s.into_bytes())
1703                    .unwrap_or_default(),
1704            )
1705            .map_err(StateError::FileError)?;
1706
1707            file.write_all(&b"\n\0"[..])
1708                .map_err(StateError::FileError)?;
1709
1710            if counter % 1000 == 0 {
1711                info!("writing {} commands to file", counter);
1712                file.sync_all().map_err(StateError::FileError)?;
1713            }
1714            counter += 1;
1715        }
1716        file.sync_all().map_err(StateError::FileError)?;
1717
1718        Ok(counter)
1719    }
1720}
1721
1722/// Validate all H2 flood knobs in an HTTP listener patch.
1723///
1724/// Every flood-detector knob (including stream-0 WINDOW_UPDATE) requires a
1725/// value `>= 1`. Passing `0` would disable the detector entirely and leave the
1726/// proxy open to CVE-2023-44487 and related attacks. The runtime constructor
1727/// `H2FloodConfig::new()` applies the same `.max(1)` clamping, but a raw
1728/// protobuf client can bypass the CLI layer, so we enforce the bound here too.
1729///
1730/// `h2_max_concurrent_streams` and `h2_stream_shrink_ratio` are connection-
1731/// config knobs that also require `>= 1`.
1732///
1733/// `h2_graceful_shutdown_deadline_seconds = 0` is intentionally **allowed** —
1734/// it means "wait forever (no forced close after GOAWAY)".
1735pub fn validate_h2_flood_knobs_http(patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
1736    macro_rules! require_ge1 {
1737        ($field:expr, $name:literal) => {
1738            if let Some(0) = $field {
1739                return Err(StateError::InvalidValue {
1740                    field: $name,
1741                    reason: "must be >= 1",
1742                });
1743            }
1744        };
1745    }
1746    require_ge1!(
1747        patch.h2_max_rst_stream_per_window,
1748        "h2_max_rst_stream_per_window"
1749    );
1750    require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
1751    require_ge1!(
1752        patch.h2_max_settings_per_window,
1753        "h2_max_settings_per_window"
1754    );
1755    require_ge1!(
1756        patch.h2_max_empty_data_per_window,
1757        "h2_max_empty_data_per_window"
1758    );
1759    require_ge1!(
1760        patch.h2_max_continuation_frames,
1761        "h2_max_continuation_frames"
1762    );
1763    require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
1764    require_ge1!(
1765        patch.h2_max_window_update_stream0_per_window,
1766        "h2_max_window_update_stream0_per_window"
1767    );
1768    require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
1769    // Shrink ratio runtime floor is 2 (lib/src/protocol/mux/h2.rs ~448 .max(2));
1770    // anything lower is silently promoted so reject at control plane.
1771    if let Some(v) = patch.h2_stream_shrink_ratio {
1772        if v < 2 {
1773            return Err(StateError::InvalidValue {
1774                field: "h2_stream_shrink_ratio",
1775                reason: "must be >= 2",
1776            });
1777        }
1778    }
1779    // Lifetime caps and HPACK limits — must be >= 1 or the runtime trips on the
1780    // first qualifying frame. doc/configure.md advertises "u64 (>= 1)" etc.
1781    require_ge1!(
1782        patch.h2_max_rst_stream_lifetime,
1783        "h2_max_rst_stream_lifetime"
1784    );
1785    require_ge1!(
1786        patch.h2_max_rst_stream_abusive_lifetime,
1787        "h2_max_rst_stream_abusive_lifetime"
1788    );
1789    require_ge1!(
1790        patch.h2_max_rst_stream_emitted_lifetime,
1791        "h2_max_rst_stream_emitted_lifetime"
1792    );
1793    require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
1794    require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
1795    Ok(())
1796}
1797
1798/// Validate all H2 flood knobs in an HTTPS listener patch (same rules as HTTP).
1799pub fn validate_h2_flood_knobs_https(patch: &UpdateHttpsListenerConfig) -> Result<(), StateError> {
1800    macro_rules! require_ge1 {
1801        ($field:expr, $name:literal) => {
1802            if let Some(0) = $field {
1803                return Err(StateError::InvalidValue {
1804                    field: $name,
1805                    reason: "must be >= 1",
1806                });
1807            }
1808        };
1809    }
1810    require_ge1!(
1811        patch.h2_max_rst_stream_per_window,
1812        "h2_max_rst_stream_per_window"
1813    );
1814    require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
1815    require_ge1!(
1816        patch.h2_max_settings_per_window,
1817        "h2_max_settings_per_window"
1818    );
1819    require_ge1!(
1820        patch.h2_max_empty_data_per_window,
1821        "h2_max_empty_data_per_window"
1822    );
1823    require_ge1!(
1824        patch.h2_max_continuation_frames,
1825        "h2_max_continuation_frames"
1826    );
1827    require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
1828    require_ge1!(
1829        patch.h2_max_window_update_stream0_per_window,
1830        "h2_max_window_update_stream0_per_window"
1831    );
1832    require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
1833    if let Some(v) = patch.h2_stream_shrink_ratio {
1834        if v < 2 {
1835            return Err(StateError::InvalidValue {
1836                field: "h2_stream_shrink_ratio",
1837                reason: "must be >= 2",
1838            });
1839        }
1840    }
1841    require_ge1!(
1842        patch.h2_max_rst_stream_lifetime,
1843        "h2_max_rst_stream_lifetime"
1844    );
1845    require_ge1!(
1846        patch.h2_max_rst_stream_abusive_lifetime,
1847        "h2_max_rst_stream_abusive_lifetime"
1848    );
1849    require_ge1!(
1850        patch.h2_max_rst_stream_emitted_lifetime,
1851        "h2_max_rst_stream_emitted_lifetime"
1852    );
1853    require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
1854    require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
1855    Ok(())
1856}
1857
1858/// Validate a `sozu_id_header` value against RFC 9110 §5.1 header-name grammar.
1859///
1860/// Rejects empty strings and strings containing CR, LF, colon, space, or tab —
1861/// a conservative approximation of the token grammar that covers all practical
1862/// injection vectors without a full RFC 9110 tokenizer.
1863/// Merge a `CustomHttpAnswers` patch into the listener's stored answers,
1864/// preserving any field not present in the patch.
1865///
1866/// Field-mask semantic: `None` in `patch` means "preserve", `Some` means
1867/// "replace". A no-op patch (all-None) leaves `target` untouched. If `target`
1868/// is currently `None`, initialize it from the patch (any `None` field stays
1869/// `None` so hot-upgrade replay sees the same partial state).
1870pub fn merge_custom_http_answers(
1871    target: &mut Option<CustomHttpAnswers>,
1872    patch: &CustomHttpAnswers,
1873) {
1874    let current = target.get_or_insert_with(CustomHttpAnswers::default);
1875    macro_rules! merge_field {
1876        ($field:ident) => {
1877            if let Some(ref v) = patch.$field {
1878                current.$field = Some(v.clone());
1879            }
1880        };
1881    }
1882    merge_field!(answer_301);
1883    merge_field!(answer_400);
1884    merge_field!(answer_401);
1885    merge_field!(answer_404);
1886    merge_field!(answer_408);
1887    merge_field!(answer_413);
1888    merge_field!(answer_421);
1889    merge_field!(answer_502);
1890    merge_field!(answer_503);
1891    merge_field!(answer_504);
1892    merge_field!(answer_507);
1893}
1894
1895/// Validate an `AlpnProtocols` patch: each value must be "h2" or "http/1.1".
1896/// Empty values vec is allowed (reset-to-default).
1897pub fn validate_alpn_protocols(values: &[String]) -> Result<(), StateError> {
1898    for value in values {
1899        if value != "h2" && value != "http/1.1" {
1900            return Err(StateError::InvalidValue {
1901                field: "alpn_protocols",
1902                reason: "each value must be \"h2\" or \"http/1.1\"",
1903            });
1904        }
1905    }
1906    Ok(())
1907}
1908
1909/// Validate a `sozu_id_header` value against the RFC 9110 §5.1 `token` grammar:
1910///
1911/// ```text
1912/// token  = 1*tchar
1913/// tchar  = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
1914///          "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
1915/// ```
1916///
1917/// Rejects empty strings, non-ASCII bytes, controls (including CR/LF/tab),
1918/// separators (including colon and space), and any other non-`tchar` byte.
1919pub fn validate_sozu_id_header(value: &str) -> Result<(), StateError> {
1920    if value.is_empty() {
1921        return Err(StateError::InvalidValue {
1922            field: "sozu_id_header",
1923            reason: "must not be empty",
1924        });
1925    }
1926    for b in value.bytes() {
1927        let is_tchar = b.is_ascii_alphanumeric()
1928            || matches!(
1929                b,
1930                b'!' | b'#'
1931                    | b'$'
1932                    | b'%'
1933                    | b'&'
1934                    | b'\''
1935                    | b'*'
1936                    | b'+'
1937                    | b'-'
1938                    | b'.'
1939                    | b'^'
1940                    | b'_'
1941                    | b'`'
1942                    | b'|'
1943                    | b'~'
1944            );
1945        if !is_tchar {
1946            return Err(StateError::InvalidValue {
1947                field: "sozu_id_header",
1948                reason: "must be a valid HTTP header name (RFC 9110 §5.1 token: alphanumeric or one of !#$%&'*+-.^_`|~)",
1949            });
1950        }
1951    }
1952    Ok(())
1953}
1954
1955fn domain_check(
1956    front_hostname: &str,
1957    front_path_rule: &PathRule,
1958    hostname: &str,
1959    path_prefix: &Option<String>,
1960) -> bool {
1961    if hostname != front_hostname {
1962        return false;
1963    }
1964
1965    if let Some(path) = &path_prefix {
1966        return path == &front_path_rule.value;
1967    }
1968
1969    true
1970}
1971
1972struct DiffMap<'a, K: Ord, V, I1, I2> {
1973    my_it: I1,
1974    other_it: I2,
1975    my: Option<(K, &'a V)>,
1976    other: Option<(K, &'a V)>,
1977}
1978
1979//fn diff_map<'a, K:Ord, V: PartialEq>(my: &'a BTreeMap<K,V>, other: &'a BTreeMap<K,V>) -> DiffMap<'a,K,V> {
1980fn diff_map<
1981    'a,
1982    K: Ord,
1983    V: PartialEq,
1984    I1: Iterator<Item = (K, &'a V)>,
1985    I2: Iterator<Item = (K, &'a V)>,
1986>(
1987    my: I1,
1988    other: I2,
1989) -> DiffMap<'a, K, V, I1, I2> {
1990    DiffMap {
1991        my_it: my,
1992        other_it: other,
1993        my: None,
1994        other: None,
1995    }
1996}
1997
1998enum DiffResult {
1999    Added,
2000    Removed,
2001    Changed,
2002}
2003
2004// this will iterate over the keys of both iterators
2005// since keys are sorted, it should be easy to see which ones are in common or not
2006impl<'a, K: Ord, V: PartialEq, I1: Iterator<Item = (K, &'a V)>, I2: Iterator<Item = (K, &'a V)>>
2007    std::iter::Iterator for DiffMap<'a, K, V, I1, I2>
2008{
2009    type Item = (K, DiffResult);
2010
2011    fn next(&mut self) -> Option<Self::Item> {
2012        loop {
2013            if self.my.is_none() {
2014                self.my = self.my_it.next();
2015            }
2016            if self.other.is_none() {
2017                self.other = self.other_it.next();
2018            }
2019
2020            match (self.my.take(), self.other.take()) {
2021                // there are no more elements in my_it, all the next elements in other
2022                // should be added
2023                // if other was none, we will stop the iterator there
2024                (None, other) => return other.map(|(k, _)| (k, DiffResult::Added)),
2025                // there are no more elements in other_it, all the next elements in my
2026                // should be removed
2027                (Some((k, _)), None) => return Some((k, DiffResult::Removed)),
2028                // element is present in my but not other
2029                (Some((k1, _v1)), Some((k2, v2))) if k1 < k2 => {
2030                    self.other = Some((k2, v2));
2031                    return Some((k1, DiffResult::Removed));
2032                }
2033                // element is present in other byt not in my
2034                (Some((k1, v1)), Some((k2, _v2))) if k1 > k2 => {
2035                    self.my = Some((k1, v1));
2036                    return Some((k2, DiffResult::Added));
2037                }
2038                (Some((k1, v1)), Some((_k2, v2))) if v1 != v2 => {
2039                    // key is present in both, if elements have changed
2040                    // return a value, otherwise go to the next key for both maps
2041                    return Some((k1, DiffResult::Changed));
2042                }
2043                _ => {}
2044            }
2045        }
2046    }
2047}
2048
2049#[cfg(test)]
2050mod tests {
2051    use rand::{RngExt, rng, seq::SliceRandom};
2052
2053    use super::*;
2054    use crate::proto::command::{
2055        CustomHttpAnswers, LoadBalancingParams, RequestHttpFrontend, RequestTcpFrontend,
2056        RulePosition,
2057    };
2058
2059    #[test]
2060    fn serialize() {
2061        let mut state: ConfigState = Default::default();
2062        state
2063            .dispatch(
2064                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2065                    cluster_id: Some(String::from("cluster_1")),
2066                    hostname: String::from("lolcatho.st:8080"),
2067                    path: PathRule::prefix(String::from("/")),
2068                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2069                    position: RulePosition::Tree.into(),
2070                    ..Default::default()
2071                })
2072                .into(),
2073            )
2074            .expect("Could not execute request");
2075        state
2076            .dispatch(
2077                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2078                    cluster_id: Some(String::from("cluster_2")),
2079                    hostname: String::from("test.local"),
2080                    path: PathRule::prefix(String::from("/abc")),
2081                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2082                    position: RulePosition::Pre.into(),
2083                    ..Default::default()
2084                })
2085                .into(),
2086            )
2087            .expect("Could not execute request");
2088        state
2089            .dispatch(
2090                &RequestType::AddBackend(AddBackend {
2091                    cluster_id: String::from("cluster_1"),
2092                    backend_id: String::from("cluster_1-0"),
2093                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2094                    ..Default::default()
2095                })
2096                .into(),
2097            )
2098            .expect("Could not execute request");
2099        state
2100            .dispatch(
2101                &RequestType::AddBackend(AddBackend {
2102                    cluster_id: String::from("cluster_1"),
2103                    backend_id: String::from("cluster_1-1"),
2104                    address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
2105                    ..Default::default()
2106                })
2107                .into(),
2108            )
2109            .expect("Could not execute request");
2110        state
2111            .dispatch(
2112                &RequestType::AddBackend(AddBackend {
2113                    cluster_id: String::from("cluster_2"),
2114                    backend_id: String::from("cluster_2-0"),
2115                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2116                    ..Default::default()
2117                })
2118                .into(),
2119            )
2120            .expect("Could not execute request");
2121        state
2122            .dispatch(
2123                &RequestType::AddBackend(AddBackend {
2124                    cluster_id: String::from("cluster_1"),
2125                    backend_id: String::from("cluster_1-3"),
2126                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2127                    ..Default::default()
2128                })
2129                .into(),
2130            )
2131            .expect("Could not execute request");
2132        state
2133            .dispatch(
2134                &RequestType::RemoveBackend(RemoveBackend {
2135                    cluster_id: String::from("cluster_1"),
2136                    backend_id: String::from("cluster_1-3"),
2137                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2138                })
2139                .into(),
2140            )
2141            .expect("Could not execute request");
2142
2143        /*
2144        let encoded = state.encode();
2145        println!("serialized:\n{}", encoded);
2146
2147        let new_state: Option<HttpProxy> = decode_str(&encoded);
2148        println!("deserialized:\n{:?}", new_state);
2149        assert_eq!(new_state, Some(state));
2150        */
2151        //assert!(false);
2152    }
2153
2154    #[test]
2155    fn diff() {
2156        let mut state: ConfigState = Default::default();
2157        state
2158            .dispatch(
2159                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2160                    cluster_id: Some(String::from("cluster_1")),
2161                    hostname: String::from("lolcatho.st:8080"),
2162                    path: PathRule::prefix(String::from("/")),
2163                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2164                    position: RulePosition::Post.into(),
2165                    ..Default::default()
2166                })
2167                .into(),
2168            )
2169            .expect("Could not execute request");
2170        state
2171            .dispatch(
2172                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2173                    cluster_id: Some(String::from("cluster_2")),
2174                    hostname: String::from("test.local"),
2175                    path: PathRule::prefix(String::from("/abc")),
2176                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2177                    ..Default::default()
2178                })
2179                .into(),
2180            )
2181            .expect("Could not execute request");
2182        state
2183            .dispatch(
2184                &RequestType::AddBackend(AddBackend {
2185                    cluster_id: String::from("cluster_1"),
2186                    backend_id: String::from("cluster_1-0"),
2187                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2188                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2189                    ..Default::default()
2190                })
2191                .into(),
2192            )
2193            .expect("Could not execute request");
2194        state
2195            .dispatch(
2196                &RequestType::AddBackend(AddBackend {
2197                    cluster_id: String::from("cluster_1"),
2198                    backend_id: String::from("cluster_1-1"),
2199                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
2200                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2201                    ..Default::default()
2202                })
2203                .into(),
2204            )
2205            .expect("Could not execute request");
2206        state
2207            .dispatch(
2208                &RequestType::AddBackend(AddBackend {
2209                    cluster_id: String::from("cluster_2"),
2210                    backend_id: String::from("cluster_2-0"),
2211                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2212                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2213                    ..Default::default()
2214                })
2215                .into(),
2216            )
2217            .expect("Could not execute request");
2218        state
2219            .dispatch(
2220                &RequestType::AddCluster(Cluster {
2221                    cluster_id: String::from("cluster_2"),
2222                    sticky_session: true,
2223                    https_redirect: true,
2224                    ..Default::default()
2225                })
2226                .into(),
2227            )
2228            .expect("Could not execute request");
2229
2230        let mut state2: ConfigState = Default::default();
2231        state2
2232            .dispatch(
2233                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2234                    cluster_id: Some(String::from("cluster_1")),
2235                    hostname: String::from("lolcatho.st:8080"),
2236                    path: PathRule::prefix(String::from("/")),
2237                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2238                    position: RulePosition::Post.into(),
2239                    ..Default::default()
2240                })
2241                .into(),
2242            )
2243            .expect("Could not execute request");
2244        state2
2245            .dispatch(
2246                &RequestType::AddBackend(AddBackend {
2247                    cluster_id: String::from("cluster_1"),
2248                    backend_id: String::from("cluster_1-0"),
2249                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2250                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2251                    ..Default::default()
2252                })
2253                .into(),
2254            )
2255            .expect("Could not execute request");
2256        state2
2257            .dispatch(
2258                &RequestType::AddBackend(AddBackend {
2259                    cluster_id: String::from("cluster_1"),
2260                    backend_id: String::from("cluster_1-1"),
2261                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
2262                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2263                    ..Default::default()
2264                })
2265                .into(),
2266            )
2267            .expect("Could not execute request");
2268        state2
2269            .dispatch(
2270                &RequestType::AddBackend(AddBackend {
2271                    cluster_id: String::from("cluster_1"),
2272                    backend_id: String::from("cluster_1-2"),
2273                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
2274                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2275                    ..Default::default()
2276                })
2277                .into(),
2278            )
2279            .expect("Could not execute request");
2280        state2
2281            .dispatch(
2282                &RequestType::AddCluster(Cluster {
2283                    cluster_id: String::from("cluster_3"),
2284                    sticky_session: false,
2285                    https_redirect: false,
2286                    ..Default::default()
2287                })
2288                .into(),
2289            )
2290            .expect("Could not execute request");
2291
2292        let e: Vec<Request> = vec![
2293            RequestType::RemoveHttpFrontend(RequestHttpFrontend {
2294                cluster_id: Some(String::from("cluster_2")),
2295                hostname: String::from("test.local"),
2296                path: PathRule::prefix(String::from("/abc")),
2297                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2298                ..Default::default()
2299            })
2300            .into(),
2301            RequestType::RemoveBackend(RemoveBackend {
2302                cluster_id: String::from("cluster_2"),
2303                backend_id: String::from("cluster_2-0"),
2304                address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2305            })
2306            .into(),
2307            RequestType::AddBackend(AddBackend {
2308                cluster_id: String::from("cluster_1"),
2309                backend_id: String::from("cluster_1-2"),
2310                address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
2311                load_balancing_parameters: Some(LoadBalancingParams::default()),
2312                ..Default::default()
2313            })
2314            .into(),
2315            RequestType::RemoveCluster(String::from("cluster_2")).into(),
2316            RequestType::AddCluster(Cluster {
2317                cluster_id: String::from("cluster_3"),
2318                sticky_session: false,
2319                https_redirect: false,
2320                ..Default::default()
2321            })
2322            .into(),
2323        ];
2324        let expected_diff: HashSet<&Request> = HashSet::from_iter(e.iter());
2325
2326        let d = state.diff(&state2);
2327        let diff = HashSet::from_iter(d.iter());
2328        println!("diff requests:\n{diff:#?}\n");
2329        println!("expected diff requests:\n{expected_diff:#?}\n");
2330
2331        let hash1 = state.hash_state();
2332        let hash2 = state2.hash_state();
2333        let mut state3 = state.clone();
2334        state3
2335            .dispatch(
2336                &RequestType::AddBackend(AddBackend {
2337                    cluster_id: String::from("cluster_1"),
2338                    backend_id: String::from("cluster_1-2"),
2339                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
2340                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2341                    ..Default::default()
2342                })
2343                .into(),
2344            )
2345            .expect("Could not execute request");
2346        let hash3 = state3.hash_state();
2347        println!("state 1 hashes: {hash1:#?}");
2348        println!("state 2 hashes: {hash2:#?}");
2349        println!("state 3 hashes: {hash3:#?}");
2350
2351        assert_eq!(diff, expected_diff);
2352    }
2353
2354    #[test]
2355    fn cluster_ids_by_domain() {
2356        let mut config = ConfigState::new();
2357        let http_front_cluster1 = RequestHttpFrontend {
2358            cluster_id: Some(String::from("MyCluster_1")),
2359            hostname: String::from("lolcatho.st"),
2360            path: PathRule::prefix(String::from("")),
2361            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2362            ..Default::default()
2363        };
2364
2365        let https_front_cluster1 = RequestHttpFrontend {
2366            cluster_id: Some(String::from("MyCluster_1")),
2367            hostname: String::from("lolcatho.st"),
2368            path: PathRule::prefix(String::from("")),
2369            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2370            ..Default::default()
2371        };
2372
2373        let http_front_cluster2 = RequestHttpFrontend {
2374            cluster_id: Some(String::from("MyCluster_2")),
2375            hostname: String::from("lolcatho.st"),
2376            path: PathRule::prefix(String::from("/api")),
2377            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2378            ..Default::default()
2379        };
2380
2381        let https_front_cluster2 = RequestHttpFrontend {
2382            cluster_id: Some(String::from("MyCluster_2")),
2383            hostname: String::from("lolcatho.st"),
2384            path: PathRule::prefix(String::from("/api")),
2385            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2386            ..Default::default()
2387        };
2388
2389        config
2390            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster1).into())
2391            .expect("Could not execute request");
2392        config
2393            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster2).into())
2394            .expect("Could not execute request");
2395        config
2396            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster1).into())
2397            .expect("Could not execute request");
2398        config
2399            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster2).into())
2400            .expect("Could not execute request");
2401
2402        let mut cluster1_cluster2: HashSet<ClusterId> = HashSet::new();
2403        cluster1_cluster2.insert(String::from("MyCluster_1"));
2404        cluster1_cluster2.insert(String::from("MyCluster_2"));
2405
2406        let mut cluster2: HashSet<ClusterId> = HashSet::new();
2407        cluster2.insert(String::from("MyCluster_2"));
2408
2409        let empty: HashSet<ClusterId> = HashSet::new();
2410        assert_eq!(
2411            config.get_cluster_ids_by_domain(String::from("lolcatho.st"), None),
2412            cluster1_cluster2
2413        );
2414        assert_eq!(
2415            config
2416                .get_cluster_ids_by_domain(String::from("lolcatho.st"), Some(String::from("/api"))),
2417            cluster2
2418        );
2419        assert_eq!(
2420            config.get_cluster_ids_by_domain(String::from("lolcathost"), None),
2421            empty
2422        );
2423        assert_eq!(
2424            config
2425                .get_cluster_ids_by_domain(String::from("lolcathost"), Some(String::from("/sozu"))),
2426            empty
2427        );
2428    }
2429
2430    #[test]
2431    fn duplicate_backends() {
2432        let mut state: ConfigState = Default::default();
2433        state
2434            .dispatch(
2435                &RequestType::AddBackend(AddBackend {
2436                    cluster_id: String::from("cluster_1"),
2437                    backend_id: String::from("cluster_1-0"),
2438                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2439                    load_balancing_parameters: Some(LoadBalancingParams::default()),
2440                    ..Default::default()
2441                })
2442                .into(),
2443            )
2444            .expect("Could not execute request");
2445
2446        let b = Backend {
2447            cluster_id: String::from("cluster_1"),
2448            backend_id: String::from("cluster_1-0"),
2449            address: "127.0.0.1:1026".parse().unwrap(),
2450            load_balancing_parameters: Some(LoadBalancingParams::default()),
2451            sticky_id: Some("sticky".to_string()),
2452            backup: None,
2453        };
2454
2455        state
2456            .dispatch(&RequestType::AddBackend(b.clone().to_add_backend()).into())
2457            .expect("Could not execute order");
2458
2459        assert_eq!(state.backends.get("cluster_1").unwrap(), &vec![b]);
2460    }
2461
2462    #[test]
2463    fn remove_backend() {
2464        let mut state: ConfigState = Default::default();
2465        state
2466            .dispatch(
2467                &RequestType::AddCluster(Cluster {
2468                    cluster_id: String::from("cluster_1"),
2469                    ..Default::default()
2470                })
2471                .into(),
2472            )
2473            .expect("Could not execute request");
2474
2475        for i in 0..10 {
2476            state
2477                .dispatch(
2478                    &RequestType::AddBackend(AddBackend {
2479                        cluster_id: String::from("cluster_1"),
2480                        backend_id: format!("cluster_1-{i}"),
2481                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2482                        ..Default::default()
2483                    })
2484                    .into(),
2485                )
2486                .expect("Could not execute request");
2487        }
2488
2489        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 10);
2490
2491        let remove_backend_2 = RequestType::RemoveBackend(RemoveBackend {
2492            cluster_id: String::from("cluster_1"),
2493            backend_id: String::from("cluster_1-0"),
2494            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2495        })
2496        .into();
2497
2498        let remove_backend_result = state.dispatch(&remove_backend_2);
2499
2500        assert!(remove_backend_result.is_ok());
2501        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
2502
2503        let redundant_remove = state.dispatch(&remove_backend_2);
2504        assert!(matches!(redundant_remove, Err(StateError::NoChange)));
2505        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
2506    }
2507
2508    #[test]
2509    fn remove_backends_randomly() {
2510        let mut state: ConfigState = Default::default();
2511        state
2512            .dispatch(
2513                &RequestType::AddCluster(Cluster {
2514                    cluster_id: String::from("cluster_1"),
2515                    ..Default::default()
2516                })
2517                .into(),
2518            )
2519            .expect("Could not execute request");
2520
2521        for _ in 0..1000 {
2522            for i in 0..10 {
2523                state
2524                    .dispatch(
2525                        &RequestType::AddBackend(AddBackend {
2526                            cluster_id: String::from("cluster_1"),
2527                            backend_id: format!("cluster_1-{i}"),
2528                            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2529                            ..Default::default()
2530                        })
2531                        .into(),
2532                    )
2533                    .expect("Could not execute request");
2534            }
2535
2536            let mut rng = rng();
2537            let mut indexes = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
2538            indexes.shuffle(&mut rng);
2539            let random_count = rng.random_range(1..indexes.len());
2540            let random_indexes: Vec<i32> = indexes.into_iter().take(random_count).collect();
2541
2542            for j in random_indexes {
2543                let remove_backend_result = state.dispatch(
2544                    &RequestType::RemoveBackend(RemoveBackend {
2545                        cluster_id: String::from("cluster_1"),
2546                        backend_id: format!("cluster_1-{j}"),
2547                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2548                    })
2549                    .into(),
2550                );
2551                assert!(remove_backend_result.is_ok());
2552            }
2553        }
2554    }
2555
2556    #[test]
2557    fn listener_diff() {
2558        let mut state: ConfigState = Default::default();
2559        let custom_http_answers = Some(CustomHttpAnswers {
2560            answer_404: Some("test".to_string()),
2561            ..Default::default()
2562        });
2563        state
2564            .dispatch(
2565                &RequestType::AddTcpListener(TcpListenerConfig {
2566                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2567                    ..Default::default()
2568                })
2569                .into(),
2570            )
2571            .expect("Could not execute request");
2572        state
2573            .dispatch(
2574                &RequestType::ActivateListener(ActivateListener {
2575                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2576                    proxy: ListenerType::Tcp.into(),
2577                    from_scm: false,
2578                })
2579                .into(),
2580            )
2581            .expect("Could not execute request");
2582        state
2583            .dispatch(
2584                &RequestType::AddHttpListener(HttpListenerConfig {
2585                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2586                    ..Default::default()
2587                })
2588                .into(),
2589            )
2590            .expect("Could not execute request");
2591        state
2592            .dispatch(
2593                &RequestType::AddHttpsListener(HttpsListenerConfig {
2594                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2595                    ..Default::default()
2596                })
2597                .into(),
2598            )
2599            .expect("Could not execute request");
2600        state
2601            .dispatch(
2602                &RequestType::ActivateListener(ActivateListener {
2603                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2604                    proxy: ListenerType::Https.into(),
2605                    from_scm: false,
2606                })
2607                .into(),
2608            )
2609            .expect("Could not execute request");
2610
2611        let mut state2: ConfigState = Default::default();
2612        state2
2613            .dispatch(
2614                &RequestType::AddTcpListener(TcpListenerConfig {
2615                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2616                    expect_proxy: true,
2617                    ..Default::default()
2618                })
2619                .into(),
2620            )
2621            .expect("Could not execute request");
2622        state2
2623            .dispatch(
2624                &RequestType::AddHttpListener(HttpListenerConfig {
2625                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2626                    http_answers: custom_http_answers.clone(),
2627                    ..Default::default()
2628                })
2629                .into(),
2630            )
2631            .expect("Could not execute request");
2632        state2
2633            .dispatch(
2634                &RequestType::ActivateListener(ActivateListener {
2635                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2636                    proxy: ListenerType::Http.into(),
2637                    from_scm: false,
2638                })
2639                .into(),
2640            )
2641            .expect("Could not execute request");
2642        state2
2643            .dispatch(
2644                &RequestType::AddHttpsListener(HttpsListenerConfig {
2645                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2646                    http_answers: custom_http_answers.clone(),
2647                    ..Default::default()
2648                })
2649                .into(),
2650            )
2651            .expect("Could not execute request");
2652        state2
2653            .dispatch(
2654                &RequestType::ActivateListener(ActivateListener {
2655                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2656                    proxy: ListenerType::Https.into(),
2657                    from_scm: false,
2658                })
2659                .into(),
2660            )
2661            .expect("Could not execute request");
2662
2663        let e: Vec<Request> = vec![
2664            RequestType::RemoveListener(RemoveListener {
2665                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2666                proxy: ListenerType::Tcp.into(),
2667            })
2668            .into(),
2669            RequestType::AddTcpListener(TcpListenerConfig {
2670                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2671                expect_proxy: true,
2672                ..Default::default()
2673            })
2674            .into(),
2675            RequestType::DeactivateListener(DeactivateListener {
2676                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2677                proxy: ListenerType::Tcp.into(),
2678                to_scm: false,
2679            })
2680            .into(),
2681            RequestType::RemoveListener(RemoveListener {
2682                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2683                proxy: ListenerType::Http.into(),
2684            })
2685            .into(),
2686            RequestType::AddHttpListener(HttpListenerConfig {
2687                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2688                http_answers: custom_http_answers.clone(),
2689                ..Default::default()
2690            })
2691            .into(),
2692            RequestType::ActivateListener(ActivateListener {
2693                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2694                proxy: ListenerType::Http.into(),
2695                from_scm: false,
2696            })
2697            .into(),
2698            RequestType::RemoveListener(RemoveListener {
2699                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2700                proxy: ListenerType::Https.into(),
2701            })
2702            .into(),
2703            RequestType::AddHttpsListener(HttpsListenerConfig {
2704                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2705                http_answers: custom_http_answers.clone(),
2706                ..Default::default()
2707            })
2708            .into(),
2709        ];
2710
2711        let diff = state.diff(&state2);
2712        //let diff: HashSet<&RequestContent> = HashSet::from_iter(d.iter());
2713        println!("expected diff requests:\n{e:#?}\n");
2714        println!("diff requests:\n{diff:#?}\n");
2715
2716        let _hash1 = state.hash_state();
2717        let _hash2 = state2.hash_state();
2718
2719        assert_eq!(diff, e);
2720    }
2721
2722    #[test]
2723    fn certificate_retrieval() {
2724        let mut state: ConfigState = Default::default();
2725        let certificate_and_key = CertificateAndKey {
2726            certificate: String::from(include_str!("../assets/certificate.pem")),
2727            key: String::from(include_str!("../assets/key.pem")),
2728            certificate_chain: vec![],
2729            versions: vec![],
2730            names: vec!["lolcatho.st".to_string()],
2731        };
2732        let add_certificate = AddCertificate {
2733            address: SocketAddress::new_v4(127, 0, 0, 1, 8080),
2734            certificate: certificate_and_key,
2735            expired_at: None,
2736        };
2737        state
2738            .dispatch(&RequestType::AddCertificate(add_certificate).into())
2739            .expect("Could not add certificate");
2740
2741        println!("state: {state:#?}");
2742
2743        // let fingerprint: Fingerprint = serde_json::from_str(
2744        //     "\"ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5\"",
2745        // )
2746        // .expect("Could not deserialize the fingerprint");
2747
2748        let certificates_found_by_fingerprint = state.get_certificates(QueryCertificatesFilters {
2749            domain: None,
2750            fingerprint: Some(
2751                "ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5".to_string(),
2752            ),
2753        });
2754
2755        println!("found certificate: {certificates_found_by_fingerprint:#?}");
2756
2757        assert!(!certificates_found_by_fingerprint.is_empty());
2758
2759        let certificate_found_by_domain_name = state.get_certificates(QueryCertificatesFilters {
2760            domain: Some("lolcatho.st".to_string()),
2761            fingerprint: None,
2762        });
2763
2764        assert!(!certificate_found_by_domain_name.is_empty());
2765    }
2766
2767    #[test]
2768    fn count_backends_across_clusters() {
2769        let mut state: ConfigState = Default::default();
2770
2771        assert_eq!(state.count_backends(), 0);
2772
2773        state
2774            .dispatch(
2775                &RequestType::AddBackend(AddBackend {
2776                    cluster_id: String::from("cluster_1"),
2777                    backend_id: String::from("cluster_1-0"),
2778                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2779                    ..Default::default()
2780                })
2781                .into(),
2782            )
2783            .expect("Could not execute request");
2784        assert_eq!(state.count_backends(), 1);
2785
2786        state
2787            .dispatch(
2788                &RequestType::AddBackend(AddBackend {
2789                    cluster_id: String::from("cluster_1"),
2790                    backend_id: String::from("cluster_1-1"),
2791                    address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
2792                    ..Default::default()
2793                })
2794                .into(),
2795            )
2796            .expect("Could not execute request");
2797        assert_eq!(state.count_backends(), 2);
2798
2799        // add backend to a second cluster
2800        state
2801            .dispatch(
2802                &RequestType::AddBackend(AddBackend {
2803                    cluster_id: String::from("cluster_2"),
2804                    backend_id: String::from("cluster_2-0"),
2805                    address: SocketAddress::new_v4(192, 168, 1, 1, 8080),
2806                    ..Default::default()
2807                })
2808                .into(),
2809            )
2810            .expect("Could not execute request");
2811        assert_eq!(state.count_backends(), 3);
2812
2813        // remove a backend and verify count decreases
2814        state
2815            .dispatch(
2816                &RequestType::RemoveBackend(RemoveBackend {
2817                    cluster_id: String::from("cluster_1"),
2818                    backend_id: String::from("cluster_1-0"),
2819                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2820                })
2821                .into(),
2822            )
2823            .expect("Could not execute request");
2824        assert_eq!(state.count_backends(), 2);
2825    }
2826
2827    #[test]
2828    fn count_frontends_across_types() {
2829        let mut state: ConfigState = Default::default();
2830
2831        assert_eq!(state.count_frontends(), 0);
2832
2833        // add an HTTP frontend
2834        state
2835            .dispatch(
2836                &RequestType::AddHttpFrontend(RequestHttpFrontend {
2837                    cluster_id: Some(String::from("cluster_1")),
2838                    hostname: String::from("example.com"),
2839                    path: PathRule::prefix(String::from("/")),
2840                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2841                    position: RulePosition::Tree.into(),
2842                    ..Default::default()
2843                })
2844                .into(),
2845            )
2846            .expect("Could not execute request");
2847        assert_eq!(state.count_frontends(), 1);
2848
2849        // add an HTTPS frontend
2850        state
2851            .dispatch(
2852                &RequestType::AddHttpsFrontend(RequestHttpFrontend {
2853                    cluster_id: Some(String::from("cluster_1")),
2854                    hostname: String::from("secure.example.com"),
2855                    path: PathRule::prefix(String::from("/")),
2856                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2857                    position: RulePosition::Tree.into(),
2858                    ..Default::default()
2859                })
2860                .into(),
2861            )
2862            .expect("Could not execute request");
2863        assert_eq!(state.count_frontends(), 2);
2864
2865        // add a TCP frontend
2866        state
2867            .dispatch(
2868                &RequestType::AddTcpFrontend(RequestTcpFrontend {
2869                    cluster_id: String::from("cluster_2"),
2870                    address: SocketAddress::new_v4(0, 0, 0, 0, 5432),
2871                    ..Default::default()
2872                })
2873                .into(),
2874            )
2875            .expect("Could not execute request");
2876        assert_eq!(state.count_frontends(), 3);
2877
2878        // add a second TCP frontend on the same cluster
2879        state
2880            .dispatch(
2881                &RequestType::AddTcpFrontend(RequestTcpFrontend {
2882                    cluster_id: String::from("cluster_2"),
2883                    address: SocketAddress::new_v4(0, 0, 0, 0, 5433),
2884                    ..Default::default()
2885                })
2886                .into(),
2887            )
2888            .expect("Could not execute request");
2889        assert_eq!(state.count_frontends(), 4);
2890
2891        // remove the HTTP frontend
2892        state
2893            .dispatch(
2894                &RequestType::RemoveHttpFrontend(RequestHttpFrontend {
2895                    cluster_id: Some(String::from("cluster_1")),
2896                    hostname: String::from("example.com"),
2897                    path: PathRule::prefix(String::from("/")),
2898                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2899                    position: RulePosition::Tree.into(),
2900                    ..Default::default()
2901                })
2902                .into(),
2903            )
2904            .expect("Could not execute request");
2905        assert_eq!(state.count_frontends(), 3);
2906    }
2907
2908    // ── helpers ────────────────────────────────────────────────────────────────
2909
2910    fn make_https_listener(address: SocketAddress) -> HttpsListenerConfig {
2911        HttpsListenerConfig {
2912            address,
2913            sticky_name: "SOZUBALANCEID".to_owned(),
2914            front_timeout: 60,
2915            back_timeout: 30,
2916            connect_timeout: 3,
2917            request_timeout: 10,
2918            ..Default::default()
2919        }
2920    }
2921
2922    fn make_http_listener(address: SocketAddress) -> HttpListenerConfig {
2923        HttpListenerConfig {
2924            address,
2925            sticky_name: "SOZUBALANCEID".to_owned(),
2926            front_timeout: 60,
2927            back_timeout: 30,
2928            connect_timeout: 3,
2929            request_timeout: 10,
2930            ..Default::default()
2931        }
2932    }
2933
2934    fn make_tcp_listener(address: SocketAddress) -> TcpListenerConfig {
2935        TcpListenerConfig {
2936            address,
2937            front_timeout: 60,
2938            back_timeout: 30,
2939            connect_timeout: 3,
2940            ..Default::default()
2941        }
2942    }
2943
2944    // ── update_https_listener ──────────────────────────────────────────────────
2945
2946    /// Happy path: patching two H2 flood knobs updates the map entry; all other
2947    /// fields are left untouched.
2948    #[test]
2949    fn update_https_listener_happy_path_h2_knobs() {
2950        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
2951        let mut state = ConfigState::new();
2952        state
2953            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
2954            .unwrap();
2955
2956        let patch = UpdateHttpsListenerConfig {
2957            address: addr,
2958            h2_max_rst_stream_per_window: Some(50),
2959            h2_max_ping_per_window: Some(20),
2960            ..Default::default()
2961        };
2962        state
2963            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
2964            .expect("update must succeed");
2965
2966        let listener = state
2967            .https_listeners
2968            .get(&SocketAddr::from(addr))
2969            .expect("listener must be present");
2970        assert_eq!(listener.h2_max_rst_stream_per_window, Some(50));
2971        assert_eq!(listener.h2_max_ping_per_window, Some(20));
2972        // Untouched fields must be unchanged
2973        assert_eq!(listener.front_timeout, 60);
2974        assert_eq!(listener.h2_max_settings_per_window, None);
2975    }
2976
2977    /// NotFound: patching a listener address that was never registered returns
2978    /// `StateError::NotFound`.
2979    #[test]
2980    fn update_https_listener_not_found() {
2981        let mut state = ConfigState::new();
2982        let patch = UpdateHttpsListenerConfig {
2983            address: SocketAddress::new_v4(1, 2, 3, 4, 9999),
2984            h2_max_rst_stream_per_window: Some(50),
2985            ..Default::default()
2986        };
2987        let err = state
2988            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
2989            .unwrap_err();
2990        assert!(
2991            matches!(
2992                err,
2993                StateError::NotFound {
2994                    kind: ObjectKind::HttpsListener,
2995                    ..
2996                }
2997            ),
2998            "expected NotFound, got: {err}"
2999        );
3000    }
3001
3002    /// No-op: a patch with only `address` set (all options None) is Ok and does
3003    /// not change any field.
3004    #[test]
3005    fn update_https_listener_noop_patch() {
3006        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3007        let mut state = ConfigState::new();
3008        let original = make_https_listener(addr);
3009        state
3010            .dispatch(&RequestType::AddHttpsListener(original.clone()).into())
3011            .unwrap();
3012
3013        let patch = UpdateHttpsListenerConfig {
3014            address: addr,
3015            ..Default::default()
3016        };
3017        state
3018            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3019            .expect("no-op patch must succeed");
3020
3021        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3022        assert_eq!(listener.front_timeout, original.front_timeout);
3023        assert_eq!(
3024            listener.h2_max_rst_stream_per_window,
3025            original.h2_max_rst_stream_per_window
3026        );
3027    }
3028
3029    /// InvalidValue: setting a flood knob to 0 must be rejected.
3030    #[test]
3031    fn update_https_listener_invalid_value_flood_knob_zero() {
3032        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3033        let mut state = ConfigState::new();
3034        state
3035            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3036            .unwrap();
3037
3038        let patch = UpdateHttpsListenerConfig {
3039            address: addr,
3040            h2_max_rst_stream_per_window: Some(0),
3041            ..Default::default()
3042        };
3043        let err = state
3044            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3045            .unwrap_err();
3046        assert!(
3047            matches!(
3048                err,
3049                StateError::InvalidValue {
3050                    field: "h2_max_rst_stream_per_window",
3051                    ..
3052                }
3053            ),
3054            "expected InvalidValue for flood knob 0, got: {err}"
3055        );
3056    }
3057
3058    /// AddCluster: an inline `cluster.health_check` with a CRLF-bearing URI
3059    /// must be rejected before the cluster lands in `ConfigState`. Without
3060    /// this guard, TOML reload / SaveState / direct API AddCluster requests
3061    /// bypass the SetHealthCheck-side check and let an attacker-controlled
3062    /// health-check URI smuggle CR/LF into outbound HTTP/1.1 probes.
3063    #[test]
3064    fn add_cluster_invalid_health_check_uri_rejected() {
3065        use crate::proto::command::HealthCheckConfig;
3066
3067        let mut state = ConfigState::new();
3068        let err = state
3069            .dispatch(
3070                &RequestType::AddCluster(Cluster {
3071                    cluster_id: String::from("evil_cluster"),
3072                    health_check: Some(HealthCheckConfig {
3073                        uri: String::from("/foo\r\nGET /admin"),
3074                        interval: 5_000,
3075                        timeout: 1_000,
3076                        healthy_threshold: 2,
3077                        unhealthy_threshold: 2,
3078                        ..Default::default()
3079                    }),
3080                    ..Default::default()
3081                })
3082                .into(),
3083            )
3084            .unwrap_err();
3085
3086        assert!(
3087            matches!(
3088                err,
3089                StateError::InvalidValue {
3090                    field: "health_check",
3091                    ..
3092                }
3093            ),
3094            "expected InvalidValue for CRLF-bearing health-check URI, got: {err}"
3095        );
3096        assert!(
3097            !state.clusters.contains_key("evil_cluster"),
3098            "cluster must not be inserted when health_check fails validation",
3099        );
3100    }
3101
3102    /// ALPN validation: reject unknown ALPN values.
3103    #[test]
3104    fn update_https_listener_alpn_unknown_value_rejected() {
3105        use crate::proto::command::AlpnProtocols;
3106
3107        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3108        let mut state = ConfigState::new();
3109        state
3110            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3111            .unwrap();
3112
3113        let patch = UpdateHttpsListenerConfig {
3114            address: addr,
3115            alpn_protocols: Some(AlpnProtocols {
3116                values: vec!["h3".to_owned()],
3117            }),
3118            ..Default::default()
3119        };
3120        let err = state
3121            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3122            .unwrap_err();
3123        assert!(
3124            matches!(
3125                err,
3126                StateError::InvalidValue {
3127                    field: "alpn_protocols",
3128                    ..
3129                }
3130            ),
3131            "expected InvalidValue for unknown ALPN, got: {err}"
3132        );
3133    }
3134
3135    /// ALPN validation: empty values vec = reset to default, must be accepted.
3136    #[test]
3137    fn update_https_listener_alpn_empty_reset_accepted() {
3138        use crate::proto::command::AlpnProtocols;
3139
3140        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3141        let mut state = ConfigState::new();
3142        let mut listener = make_https_listener(addr);
3143        listener.alpn_protocols = vec!["h2".to_owned()];
3144        state
3145            .dispatch(&RequestType::AddHttpsListener(listener).into())
3146            .unwrap();
3147
3148        let patch = UpdateHttpsListenerConfig {
3149            address: addr,
3150            alpn_protocols: Some(AlpnProtocols { values: vec![] }),
3151            ..Default::default()
3152        };
3153        state
3154            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3155            .expect("empty ALPN reset must succeed");
3156
3157        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3158        assert!(
3159            listener.alpn_protocols.is_empty(),
3160            "ALPN must have been reset to empty"
3161        );
3162    }
3163
3164    /// ALPN validation: valid values ["h2", "http/1.1"] must be accepted.
3165    #[test]
3166    fn update_https_listener_alpn_valid_values_accepted() {
3167        use crate::proto::command::AlpnProtocols;
3168
3169        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3170        let mut state = ConfigState::new();
3171        state
3172            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3173            .unwrap();
3174
3175        let patch = UpdateHttpsListenerConfig {
3176            address: addr,
3177            alpn_protocols: Some(AlpnProtocols {
3178                values: vec!["h2".to_owned(), "http/1.1".to_owned()],
3179            }),
3180            ..Default::default()
3181        };
3182        state
3183            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3184            .expect("valid ALPN must be accepted");
3185
3186        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3187        assert_eq!(listener.alpn_protocols, vec!["h2", "http/1.1"]);
3188    }
3189
3190    /// ALPN absent wrapper: when `alpn_protocols` is None in the patch, the
3191    /// listener's ALPN field must not be touched.
3192    #[test]
3193    fn update_https_listener_alpn_absent_preserves_existing() {
3194        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3195        let mut state = ConfigState::new();
3196        let mut listener = make_https_listener(addr);
3197        listener.alpn_protocols = vec!["h2".to_owned()];
3198        state
3199            .dispatch(&RequestType::AddHttpsListener(listener).into())
3200            .unwrap();
3201
3202        // patch with no alpn_protocols field
3203        let patch = UpdateHttpsListenerConfig {
3204            address: addr,
3205            front_timeout: Some(10),
3206            ..Default::default()
3207        };
3208        state
3209            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3210            .unwrap();
3211
3212        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3213        assert_eq!(
3214            listener.alpn_protocols,
3215            vec!["h2"],
3216            "ALPN must be preserved when not patched"
3217        );
3218    }
3219
3220    /// sozu_id_header validation: empty string must be rejected.
3221    #[test]
3222    fn update_https_listener_sozu_id_header_empty_rejected() {
3223        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3224        let mut state = ConfigState::new();
3225        state
3226            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3227            .unwrap();
3228
3229        let patch = UpdateHttpsListenerConfig {
3230            address: addr,
3231            sozu_id_header: Some(String::new()),
3232            ..Default::default()
3233        };
3234        let err = state
3235            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3236            .unwrap_err();
3237        assert!(
3238            matches!(
3239                err,
3240                StateError::InvalidValue {
3241                    field: "sozu_id_header",
3242                    ..
3243                }
3244            ),
3245            "expected InvalidValue for empty header name, got: {err}"
3246        );
3247    }
3248
3249    /// sozu_id_header validation: value containing colon must be rejected.
3250    #[test]
3251    fn update_https_listener_sozu_id_header_colon_rejected() {
3252        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3253        let mut state = ConfigState::new();
3254        state
3255            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3256            .unwrap();
3257
3258        let patch = UpdateHttpsListenerConfig {
3259            address: addr,
3260            sozu_id_header: Some("bad: value".to_owned()),
3261            ..Default::default()
3262        };
3263        let err = state
3264            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3265            .unwrap_err();
3266        assert!(
3267            matches!(
3268                err,
3269                StateError::InvalidValue {
3270                    field: "sozu_id_header",
3271                    ..
3272                }
3273            ),
3274            "expected InvalidValue for header name with colon, got: {err}"
3275        );
3276    }
3277
3278    /// sozu_id_header validation: well-formed token must be accepted.
3279    #[test]
3280    fn update_https_listener_sozu_id_header_valid_accepted() {
3281        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3282        let mut state = ConfigState::new();
3283        state
3284            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3285            .unwrap();
3286
3287        let patch = UpdateHttpsListenerConfig {
3288            address: addr,
3289            sozu_id_header: Some("X-Edge-Id".to_owned()),
3290            ..Default::default()
3291        };
3292        state
3293            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3294            .expect("valid header name must be accepted");
3295
3296        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3297        assert_eq!(listener.sozu_id_header.as_deref(), Some("X-Edge-Id"));
3298    }
3299
3300    /// h2_graceful_shutdown_deadline_seconds = 0 must be allowed (means "wait forever").
3301    #[test]
3302    fn update_https_listener_graceful_shutdown_zero_allowed() {
3303        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3304        let mut state = ConfigState::new();
3305        state
3306            .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3307            .unwrap();
3308
3309        let patch = UpdateHttpsListenerConfig {
3310            address: addr,
3311            h2_graceful_shutdown_deadline_seconds: Some(0),
3312            ..Default::default()
3313        };
3314        state
3315            .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3316            .expect("graceful_shutdown_deadline=0 must be allowed");
3317
3318        let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3319        assert_eq!(listener.h2_graceful_shutdown_deadline_seconds, Some(0));
3320    }
3321
3322    // ── update_http_listener ───────────────────────────────────────────────────
3323
3324    /// Happy path for HTTP listener patch.
3325    #[test]
3326    fn update_http_listener_happy_path() {
3327        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
3328        let mut state = ConfigState::new();
3329        state
3330            .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
3331            .unwrap();
3332
3333        let patch = UpdateHttpListenerConfig {
3334            address: addr,
3335            front_timeout: Some(15),
3336            h2_max_rst_stream_per_window: Some(25),
3337            ..Default::default()
3338        };
3339        state
3340            .dispatch(&RequestType::UpdateHttpListener(patch).into())
3341            .expect("HTTP update must succeed");
3342
3343        let listener = state.http_listeners.get(&SocketAddr::from(addr)).unwrap();
3344        assert_eq!(listener.front_timeout, 15);
3345        assert_eq!(listener.h2_max_rst_stream_per_window, Some(25));
3346        // untouched
3347        assert_eq!(listener.back_timeout, 30);
3348    }
3349
3350    /// HTTP listener: flood knob 0 is rejected.
3351    #[test]
3352    fn update_http_listener_flood_knob_zero_rejected() {
3353        let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
3354        let mut state = ConfigState::new();
3355        state
3356            .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
3357            .unwrap();
3358
3359        let patch = UpdateHttpListenerConfig {
3360            address: addr,
3361            h2_max_window_update_stream0_per_window: Some(0),
3362            ..Default::default()
3363        };
3364        let err = state
3365            .dispatch(&RequestType::UpdateHttpListener(patch).into())
3366            .unwrap_err();
3367        assert!(
3368            matches!(
3369                err,
3370                StateError::InvalidValue {
3371                    field: "h2_max_window_update_stream0_per_window",
3372                    ..
3373                }
3374            ),
3375            "expected InvalidValue, got: {err}"
3376        );
3377    }
3378
3379    // ── update_tcp_listener ────────────────────────────────────────────────────
3380
3381    /// Happy path for TCP listener patch.
3382    #[test]
3383    fn update_tcp_listener_happy_path() {
3384        let addr = SocketAddress::new_v4(0, 0, 0, 0, 9000);
3385        let mut state = ConfigState::new();
3386        state
3387            .dispatch(&RequestType::AddTcpListener(make_tcp_listener(addr)).into())
3388            .unwrap();
3389
3390        let patch = UpdateTcpListenerConfig {
3391            address: addr,
3392            front_timeout: Some(5),
3393            ..Default::default()
3394        };
3395        state
3396            .dispatch(&RequestType::UpdateTcpListener(patch).into())
3397            .expect("TCP update must succeed");
3398
3399        let listener = state.tcp_listeners.get(&SocketAddr::from(addr)).unwrap();
3400        assert_eq!(listener.front_timeout, 5);
3401        assert_eq!(listener.back_timeout, 30); // untouched
3402    }
3403
3404    /// TCP listener: NotFound when address is unknown.
3405    #[test]
3406    fn update_tcp_listener_not_found() {
3407        let mut state = ConfigState::new();
3408        let patch = UpdateTcpListenerConfig {
3409            address: SocketAddress::new_v4(9, 9, 9, 9, 9999),
3410            front_timeout: Some(5),
3411            ..Default::default()
3412        };
3413        let err = state
3414            .dispatch(&RequestType::UpdateTcpListener(patch).into())
3415            .unwrap_err();
3416        assert!(
3417            matches!(
3418                err,
3419                StateError::NotFound {
3420                    kind: ObjectKind::TcpListener,
3421                    ..
3422                }
3423            ),
3424            "expected NotFound, got: {err}"
3425        );
3426    }
3427
3428    /// `ConfigState::dispatch` MUST treat `SetMetricDetail` as a
3429    /// runtime-only verb (no persisted state mutation). A future
3430    /// refactor that drops the variant from the no-op match arm and
3431    /// falls through to the catch-all would silently re-break the
3432    /// SetMetricDetail dispatch path with `UndispatchableRequest`.
3433    #[test]
3434    fn dispatch_passes_through_set_metric_detail() {
3435        use crate::proto::command::{MetricDetail, SetMetricDetail};
3436        let mut state = ConfigState::new();
3437        let req: Request = RequestType::SetMetricDetail(SetMetricDetail {
3438            client_id: "test:1".to_owned(),
3439            detail: Some(MetricDetail::DetailBackend as i32),
3440            ttl_seconds: Some(60),
3441            clear: Some(false),
3442            reason: Some("regression-guard".to_owned()),
3443            peer_pid: None,
3444            peer_session_ulid: None,
3445        })
3446        .into();
3447        state
3448            .dispatch(&req)
3449            .expect("SetMetricDetail must traverse dispatch without UndispatchableRequest");
3450    }
3451}