1use std::{
2 collections::{
3 BTreeMap, BTreeSet, HashMap, HashSet, btree_map::Entry as BTreeMapEntry,
4 hash_map::DefaultHasher,
5 },
6 fs::File,
7 hash::{Hash, Hasher},
8 io::Write,
9 iter::repeat,
10 net::SocketAddr,
11};
12
13use prost::{Message, UnknownEnumValue};
14
15use crate::{
16 ObjectKind,
17 certificate::{CertificateError, Fingerprint, calculate_fingerprint},
18 proto::{
19 command::{
20 ActivateListener, AddBackend, AddCertificate, CertificateAndKey, Cluster,
21 ClusterInformation, CustomHttpAnswers, DeactivateListener, FrontendFilters,
22 HealthChecksList, HttpListenerConfig, HttpsListenerConfig, InitialState,
23 ListedFrontends, ListenerType, ListenersList, PathRule, QueryCertificatesFilters,
24 RemoveBackend, RemoveCertificate, RemoveListener, ReplaceCertificate, Request,
25 RequestCounts, RequestHttpFrontend, RequestTcpFrontend, RequestUdpFrontend,
26 SetHealthCheck, SocketAddress, TcpListenerConfig, UdpListenerConfig,
27 UpdateHttpListenerConfig, UpdateHttpsListenerConfig, UpdateTcpListenerConfig,
28 UpdateUdpListenerConfig, WorkerRequest, request::RequestType,
29 },
30 display::format_request_type,
31 },
32 response::{Backend, HttpFrontend, TcpFrontend, UdpFrontend},
33};
34
35pub type ClusterId = String;
37
38#[derive(thiserror::Error, Debug)]
39pub enum StateError {
40 #[error("Request came in empty")]
41 EmptyRequest,
42 #[error("dispatching this request did not bring any change to the state")]
43 NoChange,
44 #[error("State can not handle this request")]
45 UndispatchableRequest,
46 #[error("Did not find {kind:?} with address or id '{id}'")]
47 NotFound { kind: ObjectKind, id: String },
48 #[error("{kind:?} '{id}' already exists")]
49 Exists { kind: ObjectKind, id: String },
50 #[error("Wrong field value: {0}")]
51 WrongFieldValue(UnknownEnumValue),
52 #[error("Could not add certificate: {0}")]
53 AddCertificate(CertificateError),
54 #[error("Could not remove certificate: {0}")]
55 RemoveCertificate(String),
56 #[error("Could not replace certificate: {0}")]
57 ReplaceCertificate(String),
58 #[error(
59 "Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
60 )]
61 FrontendConversion { frontend: String, error: String },
62 #[error("Could not write state to file: {0}")]
63 FileError(std::io::Error),
64 #[error("Invalid value for field '{field}': {reason}")]
65 InvalidValue {
66 field: &'static str,
67 reason: &'static str,
68 },
69}
70
71#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
80pub struct ConfigState {
81 pub clusters: BTreeMap<ClusterId, Cluster>,
82 pub backends: BTreeMap<ClusterId, Vec<Backend>>,
83 pub http_listeners: BTreeMap<SocketAddr, HttpListenerConfig>,
85 pub https_listeners: BTreeMap<SocketAddr, HttpsListenerConfig>,
87 pub tcp_listeners: BTreeMap<SocketAddr, TcpListenerConfig>,
89 pub udp_listeners: BTreeMap<SocketAddr, UdpListenerConfig>,
91 pub http_fronts: BTreeMap<String, HttpFrontend>,
94 pub https_fronts: BTreeMap<String, HttpFrontend>,
96 pub tcp_fronts: HashMap<ClusterId, Vec<TcpFrontend>>,
97 pub udp_fronts: HashMap<ClusterId, Vec<UdpFrontend>>,
98 pub certificates: HashMap<SocketAddr, HashMap<Fingerprint, CertificateAndKey>>,
99 pub request_counts: BTreeMap<String, i32>,
101}
102
103impl ConfigState {
104 pub fn new() -> Self {
105 Self::default()
106 }
107
108 pub fn dispatch(&mut self, request: &Request) -> Result<(), StateError> {
109 let request_type = match &request.request_type {
110 Some(t) => t,
111 None => return Err(StateError::EmptyRequest),
112 };
113
114 self.increment_request_count(request);
115
116 let result = match request_type {
117 RequestType::AddCluster(cluster) => self.add_cluster(cluster),
118 RequestType::RemoveCluster(cluster_id) => self.remove_cluster(cluster_id),
119 RequestType::AddHttpListener(listener) => self.add_http_listener(listener),
120 RequestType::AddHttpsListener(listener) => self.add_https_listener(listener),
121 RequestType::AddTcpListener(listener) => self.add_tcp_listener(listener),
122 RequestType::AddUdpListener(listener) => self.add_udp_listener(listener),
123 RequestType::RemoveListener(remove) => self.remove_listener(remove),
124 RequestType::ActivateListener(activate) => self.activate_listener(activate),
125 RequestType::DeactivateListener(deactivate) => self.deactivate_listener(deactivate),
126 RequestType::AddHttpFrontend(front) => self.add_http_frontend(front),
127 RequestType::RemoveHttpFrontend(front) => self.remove_http_frontend(front),
128 RequestType::AddCertificate(add) => self.add_certificate(add),
129 RequestType::RemoveCertificate(remove) => self.remove_certificate(remove),
130 RequestType::ReplaceCertificate(replace) => self.replace_certificate(replace),
131 RequestType::AddHttpsFrontend(front) => self.add_https_frontend(front),
132 RequestType::RemoveHttpsFrontend(front) => self.remove_https_frontend(front),
133 RequestType::AddTcpFrontend(front) => self.add_tcp_frontend(front),
134 RequestType::RemoveTcpFrontend(front) => self.remove_tcp_frontend(front),
135 RequestType::AddUdpFrontend(front) => self.add_udp_frontend(front),
136 RequestType::RemoveUdpFrontend(front) => self.remove_udp_frontend(front),
137 RequestType::AddBackend(add_backend) => self.add_backend(add_backend),
138 RequestType::RemoveBackend(backend) => self.remove_backend(backend),
139 RequestType::UpdateHttpListener(patch) => self.update_http_listener(patch),
140 RequestType::UpdateHttpsListener(patch) => self.update_https_listener(patch),
141 RequestType::UpdateTcpListener(patch) => self.update_tcp_listener(patch),
142 RequestType::UpdateUdpListener(patch) => self.update_udp_listener(patch),
143 RequestType::SetHealthCheck(set) => self.set_health_check(set),
144 RequestType::RemoveHealthCheck(cluster_id) => self.remove_health_check(cluster_id),
145
146 RequestType::Logging(_)
153 | RequestType::CountRequests(_)
154 | RequestType::Status(_)
155 | RequestType::SoftStop(_)
156 | RequestType::QueryCertificatesFromWorkers(_)
157 | RequestType::QueryClusterById(_)
158 | RequestType::QueryClustersByDomain(_)
159 | RequestType::QueryMetrics(_)
160 | RequestType::QueryClustersHashes(_)
161 | RequestType::ConfigureMetrics(_)
162 | RequestType::SetMetricDetail(_)
163 | RequestType::ReturnListenSockets(_)
164 | RequestType::SetMaxConnectionsPerIp(_)
165 | RequestType::QueryMaxConnectionsPerIp(_)
166 | RequestType::HardStop(_) => Ok(()),
167
168 _other_request => Err(StateError::UndispatchableRequest),
169 };
170
171 #[cfg(debug_assertions)]
177 self.check_invariants();
178
179 result
180 }
181
182 #[cfg(debug_assertions)]
193 fn check_invariants(&self) {
194 for (addr, listener) in &self.http_listeners {
198 debug_assert_eq!(
199 SocketAddr::from(listener.address),
200 *addr,
201 "http_listener value address must match its map key"
202 );
203 }
204 for (addr, listener) in &self.https_listeners {
205 debug_assert_eq!(
206 SocketAddr::from(listener.address),
207 *addr,
208 "https_listener value address must match its map key"
209 );
210 }
211 for (addr, listener) in &self.tcp_listeners {
212 debug_assert_eq!(
213 SocketAddr::from(listener.address),
214 *addr,
215 "tcp_listener value address must match its map key"
216 );
217 }
218
219 for (cluster_id, cluster) in &self.clusters {
222 debug_assert_eq!(
223 &cluster.cluster_id, cluster_id,
224 "cluster value cluster_id must match its map key"
225 );
226 }
227
228 for (cluster_id, backends) in &self.backends {
233 for backend in backends {
234 debug_assert_eq!(
235 &backend.cluster_id, cluster_id,
236 "backend cluster_id must match its bucket key"
237 );
238 }
239 let unique: HashSet<(&String, &SocketAddr)> = backends
240 .iter()
241 .map(|b| (&b.backend_id, &b.address))
242 .collect();
243 debug_assert_eq!(
244 unique.len(),
245 backends.len(),
246 "backends within a cluster must be unique on (backend_id, address)"
247 );
248 }
249
250 for (cluster_id, fronts) in &self.tcp_fronts {
254 for front in fronts {
255 debug_assert_eq!(
256 &front.cluster_id, cluster_id,
257 "tcp_frontend cluster_id must match its bucket key"
258 );
259 }
260 let unique: HashSet<&TcpFrontend> = fronts.iter().collect();
261 debug_assert_eq!(
262 unique.len(),
263 fronts.len(),
264 "tcp frontends within a cluster must be unique"
265 );
266 }
267
268 let raw_frontends = self.http_fronts.len()
279 + self.https_fronts.len()
280 + self.count_tcp_frontends_raw()
281 + self.udp_fronts.values().map(|v| v.len()).sum::<usize>();
282 debug_assert_eq!(
283 self.count_frontends(),
284 raw_frontends,
285 "count_frontends must equal the sum of all frontend map entries"
286 );
287 let raw_backends: usize = self.backends.values().map(|v| v.len()).sum();
288 debug_assert_eq!(
289 self.count_backends(),
290 raw_backends,
291 "count_backends must equal the sum of all backend Vec lengths"
292 );
293 }
294
295 #[cfg(debug_assertions)]
298 fn count_tcp_frontends_raw(&self) -> usize {
299 self.tcp_fronts.values().map(|v| v.len()).sum()
300 }
301
302 fn increment_request_count(&mut self, request: &Request) {
304 if let Some(request_type) = &request.request_type {
305 let count = self
306 .request_counts
307 .entry(format_request_type(request_type).to_owned())
308 .or_insert(1);
309 *count += 1;
310 }
311 }
312
313 pub fn get_request_counts(&self) -> RequestCounts {
314 RequestCounts {
315 map: self.request_counts.clone(),
316 }
317 }
318
319 fn add_cluster(&mut self, cluster: &Cluster) -> Result<(), StateError> {
320 if let Some(hc) = cluster.health_check.as_ref() {
327 if let Err(reason) = crate::config::validate_health_check_config(hc) {
328 return Err(StateError::InvalidValue {
329 field: "health_check",
330 reason,
331 });
332 }
333 }
334 let cluster = cluster.clone();
335 let cluster_id = cluster.cluster_id.clone();
339 self.clusters.insert(cluster_id.clone(), cluster);
340 debug_assert!(
341 self.clusters.contains_key(&cluster_id),
342 "add_cluster must leave the cluster present in the map"
343 );
344 debug_assert_eq!(
345 self.clusters.get(&cluster_id).map(|c| &c.cluster_id),
346 Some(&cluster_id),
347 "stored cluster must be keyed by its own cluster_id"
348 );
349 Ok(())
350 }
351
352 fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), StateError> {
353 let before = self.clusters.len();
354 match self.clusters.remove(cluster_id) {
355 Some(_) => {
356 debug_assert!(
357 !self.clusters.contains_key(cluster_id),
358 "remove_cluster must evict the cluster"
359 );
360 debug_assert_eq!(
361 self.clusters.len(),
362 before - 1,
363 "remove_cluster must drop exactly one entry"
364 );
365 Ok(())
366 }
367 None => {
368 debug_assert_eq!(
369 self.clusters.len(),
370 before,
371 "a failed remove_cluster must not mutate the map"
372 );
373 Err(StateError::NotFound {
374 kind: ObjectKind::Cluster,
375 id: cluster_id.to_owned(),
376 })
377 }
378 }
379 }
380
381 fn set_health_check(&mut self, set: &SetHealthCheck) -> Result<(), StateError> {
382 if let Err(reason) = crate::config::validate_health_check_config(&set.config) {
389 return Err(StateError::InvalidValue {
390 field: "health_check",
391 reason,
392 });
393 }
394 match self.clusters.get_mut(&set.cluster_id) {
395 Some(cluster) => {
396 cluster.health_check = Some(set.config.to_owned());
397 Ok(())
398 }
399 None => Err(StateError::NotFound {
400 kind: ObjectKind::Cluster,
401 id: set.cluster_id.to_owned(),
402 }),
403 }
404 }
405
406 fn remove_health_check(&mut self, cluster_id: &str) -> Result<(), StateError> {
407 match self.clusters.get_mut(cluster_id) {
408 Some(cluster) => {
409 cluster.health_check = None;
410 Ok(())
411 }
412 None => Err(StateError::NotFound {
413 kind: ObjectKind::Cluster,
414 id: cluster_id.to_owned(),
415 }),
416 }
417 }
418
419 pub fn list_health_checks(&self, cluster_id: Option<&str>) -> HealthChecksList {
420 let map = self
421 .clusters
422 .iter()
423 .filter(|(id, _)| cluster_id.is_none_or(|filter| filter == id.as_str()))
424 .filter_map(|(id, cluster)| {
425 cluster
426 .health_check
427 .as_ref()
428 .map(|hc| (id.to_owned(), hc.to_owned()))
429 })
430 .collect();
431 HealthChecksList { map }
432 }
433
434 fn add_http_listener(&mut self, listener: &HttpListenerConfig) -> Result<(), StateError> {
435 let address: SocketAddr = listener.address.into();
436 let before = self.http_listeners.len();
437 match self.http_listeners.entry(address) {
438 BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
439 BTreeMapEntry::Occupied(_) => {
440 debug_assert_eq!(
441 self.http_listeners.len(),
442 before,
443 "a rejected duplicate add_http_listener must not mutate the map"
444 );
445 return Err(StateError::Exists {
446 kind: ObjectKind::HttpListener,
447 id: address.to_string(),
448 });
449 }
450 };
451 debug_assert!(
452 self.http_listeners.contains_key(&address),
453 "add_http_listener must insert the listener under its address"
454 );
455 debug_assert_eq!(
456 self.http_listeners.len(),
457 before + 1,
458 "add_http_listener inserts exactly one entry on the vacant path"
459 );
460 Ok(())
461 }
462
463 fn add_https_listener(&mut self, listener: &HttpsListenerConfig) -> Result<(), StateError> {
464 let address: SocketAddr = listener.address.into();
465 let before = self.https_listeners.len();
466 match self.https_listeners.entry(address) {
467 BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
468 BTreeMapEntry::Occupied(_) => {
469 debug_assert_eq!(
470 self.https_listeners.len(),
471 before,
472 "a rejected duplicate add_https_listener must not mutate the map"
473 );
474 return Err(StateError::Exists {
475 kind: ObjectKind::HttpsListener,
476 id: address.to_string(),
477 });
478 }
479 };
480 debug_assert!(
481 self.https_listeners.contains_key(&address),
482 "add_https_listener must insert the listener under its address"
483 );
484 debug_assert_eq!(
485 self.https_listeners.len(),
486 before + 1,
487 "add_https_listener inserts exactly one entry on the vacant path"
488 );
489 Ok(())
490 }
491
492 fn add_tcp_listener(&mut self, listener: &TcpListenerConfig) -> Result<(), StateError> {
493 let address: SocketAddr = listener.address.into();
494 let before = self.tcp_listeners.len();
495 match self.tcp_listeners.entry(address) {
496 BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
497 BTreeMapEntry::Occupied(_) => {
498 debug_assert_eq!(
499 self.tcp_listeners.len(),
500 before,
501 "a rejected duplicate add_tcp_listener must not mutate the map"
502 );
503 return Err(StateError::Exists {
504 kind: ObjectKind::TcpListener,
505 id: address.to_string(),
506 });
507 }
508 };
509 debug_assert!(
510 self.tcp_listeners.contains_key(&address),
511 "add_tcp_listener must insert the listener under its address"
512 );
513 debug_assert_eq!(
514 self.tcp_listeners.len(),
515 before + 1,
516 "add_tcp_listener inserts exactly one entry on the vacant path"
517 );
518 Ok(())
519 }
520
521 fn add_udp_listener(&mut self, listener: &UdpListenerConfig) -> Result<(), StateError> {
522 let address: SocketAddr = listener.address.into();
523 match self.udp_listeners.entry(address) {
524 BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
525 BTreeMapEntry::Occupied(_) => {
526 return Err(StateError::Exists {
527 kind: ObjectKind::UdpListener,
528 id: address.to_string(),
529 });
530 }
531 };
532 Ok(())
533 }
534
535 fn remove_listener(&mut self, remove: &RemoveListener) -> Result<(), StateError> {
536 match ListenerType::try_from(remove.proxy).map_err(StateError::WrongFieldValue)? {
537 ListenerType::Http => self.remove_http_listener(&remove.address.into()),
538 ListenerType::Https => self.remove_https_listener(&remove.address.into()),
539 ListenerType::Tcp => self.remove_tcp_listener(&remove.address.into()),
540 ListenerType::Udp => self.remove_udp_listener(&remove.address.into()),
541 }
542 }
543
544 fn remove_http_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
545 let before = self.http_listeners.len();
546 if self.http_listeners.remove(address).is_none() {
547 debug_assert_eq!(
548 self.http_listeners.len(),
549 before,
550 "a failed remove_http_listener must not mutate the map"
551 );
552 return Err(StateError::NoChange);
553 }
554 debug_assert!(
555 !self.http_listeners.contains_key(address),
556 "remove_http_listener must evict the address"
557 );
558 debug_assert_eq!(
559 self.http_listeners.len(),
560 before - 1,
561 "remove_http_listener drops exactly one entry"
562 );
563 Ok(())
564 }
565
566 fn remove_https_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
567 let before = self.https_listeners.len();
568 if self.https_listeners.remove(address).is_none() {
569 debug_assert_eq!(
570 self.https_listeners.len(),
571 before,
572 "a failed remove_https_listener must not mutate the map"
573 );
574 return Err(StateError::NoChange);
575 }
576 debug_assert!(
577 !self.https_listeners.contains_key(address),
578 "remove_https_listener must evict the address"
579 );
580 debug_assert_eq!(
581 self.https_listeners.len(),
582 before - 1,
583 "remove_https_listener drops exactly one entry"
584 );
585 Ok(())
586 }
587
588 fn remove_tcp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
589 let before = self.tcp_listeners.len();
590 if self.tcp_listeners.remove(address).is_none() {
591 debug_assert_eq!(
592 self.tcp_listeners.len(),
593 before,
594 "a failed remove_tcp_listener must not mutate the map"
595 );
596 return Err(StateError::NoChange);
597 }
598 debug_assert!(
599 !self.tcp_listeners.contains_key(address),
600 "remove_tcp_listener must evict the address"
601 );
602 debug_assert_eq!(
603 self.tcp_listeners.len(),
604 before - 1,
605 "remove_tcp_listener drops exactly one entry"
606 );
607 Ok(())
608 }
609
610 fn remove_udp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
611 if self.udp_listeners.remove(address).is_none() {
612 return Err(StateError::NoChange);
613 }
614 Ok(())
615 }
616
617 fn update_http_listener(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
624 validate_h2_flood_knobs_http(patch)?;
625
626 let address: SocketAddr = patch.address.into();
627 let listener =
628 self.http_listeners
629 .get_mut(&address)
630 .ok_or_else(|| StateError::NotFound {
631 kind: ObjectKind::HttpListener,
632 id: address.to_string(),
633 })?;
634
635 if let Some(v) = patch.public_address {
637 listener.public_address = Some(v);
638 }
639 if let Some(v) = patch.expect_proxy {
640 listener.expect_proxy = v;
641 }
642 if let Some(ref v) = patch.sticky_name {
643 listener.sticky_name = v.to_owned();
644 }
645 if let Some(v) = patch.front_timeout {
646 listener.front_timeout = v;
647 }
648 if let Some(v) = patch.back_timeout {
649 listener.back_timeout = v;
650 }
651 if let Some(v) = patch.connect_timeout {
652 listener.connect_timeout = v;
653 }
654 if let Some(v) = patch.request_timeout {
655 listener.request_timeout = v;
656 }
657 if let Some(patch_answers) = patch.http_answers.as_ref() {
658 merge_custom_http_answers(&mut listener.http_answers, patch_answers);
659 }
660 if let Some(v) = patch.h2_max_rst_stream_per_window {
662 listener.h2_max_rst_stream_per_window = Some(v);
663 }
664 if let Some(v) = patch.h2_max_ping_per_window {
665 listener.h2_max_ping_per_window = Some(v);
666 }
667 if let Some(v) = patch.h2_max_settings_per_window {
668 listener.h2_max_settings_per_window = Some(v);
669 }
670 if let Some(v) = patch.h2_max_empty_data_per_window {
671 listener.h2_max_empty_data_per_window = Some(v);
672 }
673 if let Some(v) = patch.h2_max_continuation_frames {
674 listener.h2_max_continuation_frames = Some(v);
675 }
676 if let Some(v) = patch.h2_max_glitch_count {
677 listener.h2_max_glitch_count = Some(v);
678 }
679 if let Some(v) = patch.h2_initial_connection_window {
680 listener.h2_initial_connection_window = Some(v);
681 }
682 if let Some(v) = patch.h2_max_concurrent_streams {
683 listener.h2_max_concurrent_streams = Some(v);
684 }
685 if let Some(v) = patch.h2_stream_shrink_ratio {
686 listener.h2_stream_shrink_ratio = Some(v);
687 }
688 if let Some(v) = patch.h2_max_rst_stream_lifetime {
689 listener.h2_max_rst_stream_lifetime = Some(v);
690 }
691 if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
692 listener.h2_max_rst_stream_abusive_lifetime = Some(v);
693 }
694 if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
695 listener.h2_max_rst_stream_emitted_lifetime = Some(v);
696 }
697 if let Some(v) = patch.h2_max_header_list_size {
698 listener.h2_max_header_list_size = Some(v);
699 }
700 if let Some(v) = patch.h2_max_header_table_size {
701 listener.h2_max_header_table_size = Some(v);
702 }
703 if let Some(v) = patch.h2_max_header_fields {
704 listener.h2_max_header_fields = Some(v);
705 }
706 if let Some(v) = patch.h2_stream_idle_timeout_seconds {
707 listener.h2_stream_idle_timeout_seconds = Some(v);
708 }
709 if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
711 listener.h2_graceful_shutdown_deadline_seconds = Some(v);
712 }
713 if let Some(v) = patch.h2_max_window_update_stream0_per_window {
714 listener.h2_max_window_update_stream0_per_window = Some(v);
715 }
716 if let Some(ref v) = patch.sozu_id_header {
717 validate_sozu_id_header(v)?;
718 listener.sozu_id_header = Some(v.to_owned());
719 }
720 Ok(())
721 }
722
723 fn update_https_listener(
730 &mut self,
731 patch: &UpdateHttpsListenerConfig,
732 ) -> Result<(), StateError> {
733 validate_h2_flood_knobs_https(patch)?;
734
735 let address: SocketAddr = patch.address.into();
736 let listener =
737 self.https_listeners
738 .get_mut(&address)
739 .ok_or_else(|| StateError::NotFound {
740 kind: ObjectKind::HttpsListener,
741 id: address.to_string(),
742 })?;
743
744 if let Some(v) = patch.public_address {
746 listener.public_address = Some(v);
747 }
748 if let Some(v) = patch.expect_proxy {
749 listener.expect_proxy = v;
750 }
751 if let Some(ref v) = patch.sticky_name {
752 listener.sticky_name = v.to_owned();
753 }
754 if let Some(v) = patch.front_timeout {
755 listener.front_timeout = v;
756 }
757 if let Some(v) = patch.back_timeout {
758 listener.back_timeout = v;
759 }
760 if let Some(v) = patch.connect_timeout {
761 listener.connect_timeout = v;
762 }
763 if let Some(v) = patch.request_timeout {
764 listener.request_timeout = v;
765 }
766 if let Some(patch_answers) = patch.http_answers.as_ref() {
767 merge_custom_http_answers(&mut listener.http_answers, patch_answers);
768 }
769 if let Some(ref alpn_wrapper) = patch.alpn_protocols {
771 validate_alpn_protocols(&alpn_wrapper.values)?;
772 listener.alpn_protocols = alpn_wrapper.values.clone();
774 }
775 if let Some(v) = patch.strict_sni_binding {
776 listener.strict_sni_binding = Some(v);
777 }
778 if let Some(v) = patch.disable_http11 {
779 listener.disable_http11 = Some(v);
780 }
781 if let Some(v) = patch.h2_max_rst_stream_per_window {
783 listener.h2_max_rst_stream_per_window = Some(v);
784 }
785 if let Some(v) = patch.h2_max_ping_per_window {
786 listener.h2_max_ping_per_window = Some(v);
787 }
788 if let Some(v) = patch.h2_max_settings_per_window {
789 listener.h2_max_settings_per_window = Some(v);
790 }
791 if let Some(v) = patch.h2_max_empty_data_per_window {
792 listener.h2_max_empty_data_per_window = Some(v);
793 }
794 if let Some(v) = patch.h2_max_continuation_frames {
795 listener.h2_max_continuation_frames = Some(v);
796 }
797 if let Some(v) = patch.h2_max_glitch_count {
798 listener.h2_max_glitch_count = Some(v);
799 }
800 if let Some(v) = patch.h2_initial_connection_window {
801 listener.h2_initial_connection_window = Some(v);
802 }
803 if let Some(v) = patch.h2_max_concurrent_streams {
804 listener.h2_max_concurrent_streams = Some(v);
805 }
806 if let Some(v) = patch.h2_stream_shrink_ratio {
807 listener.h2_stream_shrink_ratio = Some(v);
808 }
809 if let Some(v) = patch.h2_max_rst_stream_lifetime {
810 listener.h2_max_rst_stream_lifetime = Some(v);
811 }
812 if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
813 listener.h2_max_rst_stream_abusive_lifetime = Some(v);
814 }
815 if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
816 listener.h2_max_rst_stream_emitted_lifetime = Some(v);
817 }
818 if let Some(v) = patch.h2_max_header_list_size {
819 listener.h2_max_header_list_size = Some(v);
820 }
821 if let Some(v) = patch.h2_max_header_table_size {
822 listener.h2_max_header_table_size = Some(v);
823 }
824 if let Some(v) = patch.h2_max_header_fields {
825 listener.h2_max_header_fields = Some(v);
826 }
827 if let Some(v) = patch.h2_stream_idle_timeout_seconds {
828 listener.h2_stream_idle_timeout_seconds = Some(v);
829 }
830 if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
832 listener.h2_graceful_shutdown_deadline_seconds = Some(v);
833 }
834 if let Some(v) = patch.h2_max_window_update_stream0_per_window {
835 listener.h2_max_window_update_stream0_per_window = Some(v);
836 }
837 if let Some(ref v) = patch.sozu_id_header {
838 validate_sozu_id_header(v)?;
839 listener.sozu_id_header = Some(v.to_owned());
840 }
841 Ok(())
842 }
843
844 fn update_tcp_listener(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), StateError> {
849 let address: SocketAddr = patch.address.into();
850 let listener =
851 self.tcp_listeners
852 .get_mut(&address)
853 .ok_or_else(|| StateError::NotFound {
854 kind: ObjectKind::TcpListener,
855 id: address.to_string(),
856 })?;
857
858 if let Some(v) = patch.public_address {
859 listener.public_address = Some(v);
860 }
861 if let Some(v) = patch.expect_proxy {
862 listener.expect_proxy = v;
863 }
864 if let Some(v) = patch.front_timeout {
865 listener.front_timeout = v;
866 }
867 if let Some(v) = patch.back_timeout {
868 listener.back_timeout = v;
869 }
870 if let Some(v) = patch.connect_timeout {
871 listener.connect_timeout = v;
872 }
873 Ok(())
874 }
875
876 fn update_udp_listener(&mut self, patch: &UpdateUdpListenerConfig) -> Result<(), StateError> {
881 let address: SocketAddr = patch.address.into();
882 let listener =
883 self.udp_listeners
884 .get_mut(&address)
885 .ok_or_else(|| StateError::NotFound {
886 kind: ObjectKind::UdpListener,
887 id: address.to_string(),
888 })?;
889
890 if let Some(v) = patch.public_address {
891 listener.public_address = Some(v);
892 }
893 if let Some(v) = patch.front_timeout {
894 listener.front_timeout = v;
895 }
896 if let Some(v) = patch.back_timeout {
897 listener.back_timeout = v;
898 }
899 if let Some(v) = patch.max_rx_datagram_size {
900 listener.max_rx_datagram_size = v;
901 }
902 if let Some(v) = patch.max_flows {
903 listener.max_flows = v;
904 }
905 Ok(())
906 }
907
908 fn activate_listener(&mut self, activate: &ActivateListener) -> Result<(), StateError> {
909 match ListenerType::try_from(activate.proxy).map_err(StateError::WrongFieldValue)? {
910 ListenerType::Http => self
911 .http_listeners
912 .get_mut(&activate.address.into())
913 .map(|listener| listener.active = true)
914 .ok_or(StateError::NotFound {
915 kind: ObjectKind::HttpListener,
916 id: activate.address.to_string(),
917 }),
918 ListenerType::Https => self
919 .https_listeners
920 .get_mut(&activate.address.into())
921 .map(|listener| listener.active = true)
922 .ok_or(StateError::NotFound {
923 kind: ObjectKind::HttpsListener,
924 id: activate.address.to_string(),
925 }),
926 ListenerType::Tcp => self
927 .tcp_listeners
928 .get_mut(&activate.address.into())
929 .map(|listener| listener.active = true)
930 .ok_or(StateError::NotFound {
931 kind: ObjectKind::TcpListener,
932 id: activate.address.to_string(),
933 }),
934 ListenerType::Udp => self
935 .udp_listeners
936 .get_mut(&activate.address.into())
937 .map(|listener| listener.active = true)
938 .ok_or(StateError::NotFound {
939 kind: ObjectKind::UdpListener,
940 id: activate.address.to_string(),
941 }),
942 }
943 }
944
945 fn deactivate_listener(&mut self, deactivate: &DeactivateListener) -> Result<(), StateError> {
946 match ListenerType::try_from(deactivate.proxy).map_err(StateError::WrongFieldValue)? {
947 ListenerType::Http => self
948 .http_listeners
949 .get_mut(&deactivate.address.into())
950 .map(|listener| listener.active = false)
951 .ok_or(StateError::NotFound {
952 kind: ObjectKind::HttpListener,
953 id: deactivate.address.to_string(),
954 }),
955 ListenerType::Https => self
956 .https_listeners
957 .get_mut(&deactivate.address.into())
958 .map(|listener| listener.active = false)
959 .ok_or(StateError::NotFound {
960 kind: ObjectKind::HttpsListener,
961 id: deactivate.address.to_string(),
962 }),
963 ListenerType::Tcp => self
964 .tcp_listeners
965 .get_mut(&deactivate.address.into())
966 .map(|listener| listener.active = false)
967 .ok_or(StateError::NotFound {
968 kind: ObjectKind::TcpListener,
969 id: deactivate.address.to_string(),
970 }),
971 ListenerType::Udp => self
972 .udp_listeners
973 .get_mut(&deactivate.address.into())
974 .map(|listener| listener.active = false)
975 .ok_or(StateError::NotFound {
976 kind: ObjectKind::UdpListener,
977 id: deactivate.address.to_string(),
978 }),
979 }
980 }
981
982 fn add_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
983 let front_as_key = front.to_string();
984 let before = self.http_fronts.len();
985
986 match self.http_fronts.entry(front.to_string()) {
987 BTreeMapEntry::Vacant(e) => {
988 e.insert(front.clone().to_frontend().map_err(|into_error| {
989 StateError::FrontendConversion {
990 frontend: front_as_key,
991 error: into_error.to_string(),
992 }
993 })?)
994 }
995 BTreeMapEntry::Occupied(_) => {
996 debug_assert_eq!(
997 self.http_fronts.len(),
998 before,
999 "a rejected duplicate add_http_frontend must not mutate the map"
1000 );
1001 return Err(StateError::Exists {
1002 kind: ObjectKind::HttpFrontend,
1003 id: front.to_string(),
1004 });
1005 }
1006 };
1007 debug_assert!(
1010 self.http_fronts.contains_key(&front.to_string()),
1011 "add_http_frontend must insert the route key on success"
1012 );
1013 debug_assert_eq!(
1014 self.http_fronts.len(),
1015 before + 1,
1016 "add_http_frontend inserts exactly one entry on success"
1017 );
1018 Ok(())
1019 }
1020
1021 fn add_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
1022 let front_as_key = front.to_string();
1023 let before = self.https_fronts.len();
1024
1025 match self.https_fronts.entry(front.to_string()) {
1026 BTreeMapEntry::Vacant(e) => {
1027 e.insert(front.clone().to_frontend().map_err(|into_error| {
1028 StateError::FrontendConversion {
1029 frontend: front_as_key,
1030 error: into_error.to_string(),
1031 }
1032 })?)
1033 }
1034 BTreeMapEntry::Occupied(_) => {
1035 debug_assert_eq!(
1036 self.https_fronts.len(),
1037 before,
1038 "a rejected duplicate add_https_frontend must not mutate the map"
1039 );
1040 return Err(StateError::Exists {
1041 kind: ObjectKind::HttpsFrontend,
1042 id: front.to_string(),
1043 });
1044 }
1045 };
1046 debug_assert!(
1047 self.https_fronts.contains_key(&front.to_string()),
1048 "add_https_frontend must insert the route key on success"
1049 );
1050 debug_assert_eq!(
1051 self.https_fronts.len(),
1052 before + 1,
1053 "add_https_frontend inserts exactly one entry on success"
1054 );
1055 Ok(())
1056 }
1057
1058 fn remove_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
1059 let key = front.to_string();
1060 let before = self.http_fronts.len();
1061 self.http_fronts.remove(&key).ok_or(StateError::NotFound {
1062 kind: ObjectKind::HttpFrontend,
1063 id: front.to_string(),
1064 })?;
1065 debug_assert!(
1066 !self.http_fronts.contains_key(&key),
1067 "remove_http_frontend must evict the route key"
1068 );
1069 debug_assert_eq!(
1070 self.http_fronts.len(),
1071 before - 1,
1072 "remove_http_frontend drops exactly one entry"
1073 );
1074 Ok(())
1075 }
1076
1077 fn remove_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
1078 let key = front.to_string();
1079 let before = self.https_fronts.len();
1080 self.https_fronts.remove(&key).ok_or(StateError::NotFound {
1081 kind: ObjectKind::HttpsFrontend,
1082 id: front.to_string(),
1083 })?;
1084 debug_assert!(
1085 !self.https_fronts.contains_key(&key),
1086 "remove_https_frontend must evict the route key"
1087 );
1088 debug_assert_eq!(
1089 self.https_fronts.len(),
1090 before - 1,
1091 "remove_https_frontend drops exactly one entry"
1092 );
1093 Ok(())
1094 }
1095
1096 fn add_certificate(&mut self, add: &AddCertificate) -> Result<(), StateError> {
1097 let fingerprint = add
1098 .certificate
1099 .fingerprint()
1100 .map_err(StateError::AddCertificate)?;
1101
1102 let entry = self.certificates.entry(add.address.into()).or_default();
1103
1104 let mut add = add.clone();
1105 add.certificate
1106 .apply_overriding_names()
1107 .map_err(StateError::AddCertificate)?;
1108
1109 if entry.contains_key(&fingerprint) {
1110 info!(
1111 "Skip loading of certificate '{}' for domain '{}' on listener '{}', the certificate is already present.",
1112 fingerprint,
1113 add.certificate.names.join(", "),
1114 add.address
1115 );
1116 return Ok(());
1117 }
1118
1119 let before = entry.len();
1120 entry.insert(fingerprint.clone(), add.certificate);
1121 debug_assert!(
1122 entry.contains_key(&fingerprint),
1123 "add_certificate must insert the fingerprint under its address"
1124 );
1125 debug_assert_eq!(
1126 entry.len(),
1127 before + 1,
1128 "add_certificate inserts exactly one fingerprint on the new path"
1129 );
1130 Ok(())
1131 }
1132
1133 fn remove_certificate(&mut self, remove: &RemoveCertificate) -> Result<(), StateError> {
1134 let fingerprint = Fingerprint(
1135 hex::decode(&remove.fingerprint)
1136 .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
1137 );
1138
1139 if let Some(index) = self.certificates.get_mut(&remove.address.into()) {
1140 index.remove(&fingerprint);
1141 debug_assert!(
1142 !index.contains_key(&fingerprint),
1143 "remove_certificate must evict the fingerprint when the address is known"
1144 );
1145 }
1146
1147 Ok(())
1148 }
1149
1150 fn replace_certificate(&mut self, replace: &ReplaceCertificate) -> Result<(), StateError> {
1155 let replace_address = replace.address.into();
1156 let old_fingerprint = Fingerprint(
1157 hex::decode(&replace.old_fingerprint)
1158 .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
1159 );
1160
1161 self.certificates
1162 .get_mut(&replace_address)
1163 .ok_or(StateError::NotFound {
1164 kind: ObjectKind::Certificate,
1165 id: replace.address.to_string(),
1166 })?
1167 .remove(&old_fingerprint);
1168
1169 let new_fingerprint = Fingerprint(
1170 calculate_fingerprint(replace.new_certificate.certificate.as_bytes()).map_err(
1171 |fingerprint_err| StateError::ReplaceCertificate(fingerprint_err.to_string()),
1172 )?,
1173 );
1174
1175 self.certificates
1176 .get_mut(&replace_address)
1177 .map(|certs| certs.insert(new_fingerprint.clone(), replace.new_certificate.clone()));
1178
1179 if !self
1180 .certificates
1181 .get(&replace_address)
1182 .ok_or(StateError::ReplaceCertificate(
1183 "Unlikely error. This entry in the certificate hashmap should be present"
1184 .to_string(),
1185 ))?
1186 .contains_key(&new_fingerprint)
1187 {
1188 return Err(StateError::ReplaceCertificate(format!(
1189 "Failed to insert the new certificate for address {}",
1190 replace.address
1191 )));
1192 }
1193 debug_assert!(
1196 self.certificates
1197 .get(&replace_address)
1198 .is_some_and(|certs| certs.contains_key(&new_fingerprint)),
1199 "replace_certificate must leave the new fingerprint present"
1200 );
1201 debug_assert!(
1202 new_fingerprint == old_fingerprint
1203 || self
1204 .certificates
1205 .get(&replace_address)
1206 .is_none_or(|certs| !certs.contains_key(&old_fingerprint)),
1207 "replace_certificate must evict the old fingerprint unless it equals the new one"
1208 );
1209 Ok(())
1210 }
1211
1212 fn add_tcp_frontend(&mut self, front: &RequestTcpFrontend) -> Result<(), StateError> {
1213 let tcp_frontends = self.tcp_fronts.entry(front.cluster_id.clone()).or_default();
1214
1215 let tcp_frontend = TcpFrontend {
1216 cluster_id: front.cluster_id.clone(),
1217 address: front.address.into(),
1218 tags: front.tags.clone(),
1219 };
1220 let before = tcp_frontends.len();
1221 if tcp_frontends.contains(&tcp_frontend) {
1222 debug_assert_eq!(
1223 tcp_frontends.len(),
1224 before,
1225 "a rejected duplicate add_tcp_frontend must not grow the bucket"
1226 );
1227 return Err(StateError::Exists {
1228 kind: ObjectKind::TcpFrontend,
1229 id: format!("{tcp_frontend:?}"),
1230 });
1231 }
1232
1233 debug_assert_eq!(
1234 tcp_frontend.cluster_id, front.cluster_id,
1235 "the built frontend must carry its bucket's cluster_id"
1236 );
1237 tcp_frontends.push(tcp_frontend);
1238 debug_assert_eq!(
1239 tcp_frontends.len(),
1240 before + 1,
1241 "add_tcp_frontend appends exactly one entry"
1242 );
1243 Ok(())
1244 }
1245
1246 fn remove_tcp_frontend(
1247 &mut self,
1248 front_to_remove: &RequestTcpFrontend,
1249 ) -> Result<(), StateError> {
1250 let tcp_frontends =
1251 self.tcp_fronts
1252 .get_mut(&front_to_remove.cluster_id)
1253 .ok_or(StateError::NotFound {
1254 kind: ObjectKind::TcpFrontend,
1255 id: format!("{front_to_remove:?}"),
1256 })?;
1257
1258 let len = tcp_frontends.len();
1259 let remove_address: SocketAddr = front_to_remove.address.into();
1260 tcp_frontends.retain(|front| front.address != remove_address);
1261 let after = tcp_frontends.len();
1262 if after == len {
1263 return Err(StateError::NoChange);
1264 }
1265 debug_assert_eq!(
1269 after,
1270 len - 1,
1271 "remove_tcp_frontend drops exactly one entry"
1272 );
1273 debug_assert!(
1274 !tcp_frontends.iter().any(|f| f.address == remove_address),
1275 "remove_tcp_frontend must leave no frontend at the removed address"
1276 );
1277 Ok(())
1278 }
1279
1280 fn add_udp_frontend(&mut self, front: &RequestUdpFrontend) -> Result<(), StateError> {
1281 let udp_frontends = self.udp_fronts.entry(front.cluster_id.clone()).or_default();
1282
1283 let udp_frontend = UdpFrontend {
1284 cluster_id: front.cluster_id.clone(),
1285 address: front.address.into(),
1286 tags: front.tags.clone(),
1287 };
1288 if udp_frontends.contains(&udp_frontend) {
1289 return Err(StateError::Exists {
1290 kind: ObjectKind::UdpFrontend,
1291 id: format!("{udp_frontend:?}"),
1292 });
1293 }
1294
1295 udp_frontends.push(udp_frontend);
1296 Ok(())
1297 }
1298
1299 fn remove_udp_frontend(
1300 &mut self,
1301 front_to_remove: &RequestUdpFrontend,
1302 ) -> Result<(), StateError> {
1303 let udp_frontends =
1304 self.udp_fronts
1305 .get_mut(&front_to_remove.cluster_id)
1306 .ok_or(StateError::NotFound {
1307 kind: ObjectKind::UdpFrontend,
1308 id: format!("{front_to_remove:?}"),
1309 })?;
1310
1311 let len = udp_frontends.len();
1312 udp_frontends.retain(|front| front.address != front_to_remove.address.into());
1313 if udp_frontends.len() == len {
1314 return Err(StateError::NoChange);
1315 }
1316 Ok(())
1317 }
1318
1319 fn add_backend(&mut self, add_backend: &AddBackend) -> Result<(), StateError> {
1320 let backend = Backend {
1321 address: add_backend.address.into(),
1322 cluster_id: add_backend.cluster_id.clone(),
1323 backend_id: add_backend.backend_id.clone(),
1324 sticky_id: add_backend.sticky_id.clone(),
1325 load_balancing_parameters: add_backend.load_balancing_parameters,
1326 backup: add_backend.backup,
1327 };
1328 let backends = self.backends.entry(backend.cluster_id.clone()).or_default();
1329 let backend_id = backend.backend_id.clone();
1330 let backend_address = backend.address;
1331 let before = backends.len();
1332
1333 let was_present = backends
1338 .iter()
1339 .any(|b| b.backend_id == backend_id && b.address == backend_address);
1340 backends.retain(|b| b.backend_id != backend.backend_id || b.address != backend.address);
1341 debug_assert_eq!(
1342 backends.len(),
1343 before - was_present as usize,
1344 "the upsert retain must drop exactly the prior copy iff it existed"
1345 );
1346 backends.push(backend);
1347 backends.sort();
1348
1349 debug_assert_eq!(
1350 backends.len(),
1351 before + (!was_present) as usize,
1352 "add_backend grows the bucket by one iff the backend was new"
1353 );
1354 debug_assert_eq!(
1355 backends
1356 .iter()
1357 .filter(|b| b.backend_id == backend_id && b.address == backend_address)
1358 .count(),
1359 1,
1360 "exactly one copy of the upserted backend must remain"
1361 );
1362 Ok(())
1363 }
1364
1365 fn remove_backend(&mut self, backend: &RemoveBackend) -> Result<(), StateError> {
1366 let backend_list =
1367 self.backends
1368 .get_mut(&backend.cluster_id)
1369 .ok_or(StateError::NotFound {
1370 kind: ObjectKind::Backend,
1371 id: backend.backend_id.to_owned(),
1372 })?;
1373
1374 let len = backend_list.len();
1375 let remove_address: SocketAddr = backend.address.into();
1376 backend_list.retain(|b| b.backend_id != backend.backend_id || b.address != remove_address);
1377 backend_list.sort();
1378 let after = backend_list.len();
1379 if after == len {
1380 return Err(StateError::NoChange);
1381 }
1382 debug_assert_eq!(after, len - 1, "remove_backend drops exactly one entry");
1385 debug_assert!(
1386 !backend_list
1387 .iter()
1388 .any(|b| b.backend_id == backend.backend_id && b.address == remove_address),
1389 "remove_backend must leave no backend matching (backend_id, address)"
1390 );
1391 Ok(())
1392 }
1393
1394 fn generate_requests(&self) -> Vec<Request> {
1396 let mut v: Vec<Request> = Vec::new();
1397
1398 for listener in self.http_listeners.values() {
1399 v.push(RequestType::AddHttpListener(listener.clone()).into());
1400 if listener.active {
1401 v.push(
1402 RequestType::ActivateListener(ActivateListener {
1403 address: listener.address,
1404 proxy: ListenerType::Http.into(),
1405 from_scm: false,
1406 })
1407 .into(),
1408 );
1409 }
1410 }
1411
1412 for listener in self.https_listeners.values() {
1413 v.push(RequestType::AddHttpsListener(listener.clone()).into());
1414 if listener.active {
1415 v.push(
1416 RequestType::ActivateListener(ActivateListener {
1417 address: listener.address,
1418 proxy: ListenerType::Https.into(),
1419 from_scm: false,
1420 })
1421 .into(),
1422 );
1423 }
1424 }
1425
1426 for listener in self.tcp_listeners.values() {
1427 v.push(RequestType::AddTcpListener(*listener).into());
1428 if listener.active {
1429 v.push(
1430 RequestType::ActivateListener(ActivateListener {
1431 address: listener.address,
1432 proxy: ListenerType::Tcp.into(),
1433 from_scm: false,
1434 })
1435 .into(),
1436 );
1437 }
1438 }
1439
1440 for listener in self.udp_listeners.values() {
1441 v.push(RequestType::AddUdpListener(*listener).into());
1442 if listener.active {
1443 v.push(
1444 RequestType::ActivateListener(ActivateListener {
1445 address: listener.address,
1446 proxy: ListenerType::Udp.into(),
1447 from_scm: false,
1448 })
1449 .into(),
1450 );
1451 }
1452 }
1453
1454 for cluster in self.clusters.values() {
1455 v.push(RequestType::AddCluster(cluster.clone()).into());
1456 }
1457
1458 for front in self.http_fronts.values() {
1459 v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
1460 }
1461
1462 for (front, certs) in self.certificates.iter() {
1463 for certificate_and_key in certs.values() {
1464 v.push(
1465 RequestType::AddCertificate(AddCertificate {
1466 address: SocketAddress::from(*front),
1467 certificate: certificate_and_key.clone(),
1468 expired_at: None,
1469 })
1470 .into(),
1471 );
1472 }
1473 }
1474
1475 for front in self.https_fronts.values() {
1476 v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
1477 }
1478
1479 for front_list in self.tcp_fronts.values() {
1480 for front in front_list {
1481 v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
1482 }
1483 }
1484
1485 for front_list in self.udp_fronts.values() {
1486 for front in front_list {
1487 v.push(RequestType::AddUdpFrontend(front.clone().into()).into());
1488 }
1489 }
1490
1491 for backend_list in self.backends.values() {
1492 for backend in backend_list {
1493 v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
1494 }
1495 }
1496
1497 #[cfg(debug_assertions)]
1503 {
1504 let mut replayed = ConfigState::new();
1505 for request in &v {
1506 debug_assert!(
1507 replayed.dispatch(request).is_ok(),
1508 "every request from generate_requests must replay cleanly"
1509 );
1510 }
1511 debug_assert!(
1512 replayed.clusters == self.clusters
1513 && replayed.backends == self.backends
1514 && replayed.http_listeners == self.http_listeners
1515 && replayed.https_listeners == self.https_listeners
1516 && replayed.tcp_listeners == self.tcp_listeners
1517 && replayed.http_fronts == self.http_fronts
1518 && replayed.https_fronts == self.https_fronts
1519 && replayed.tcp_fronts == self.tcp_fronts
1520 && replayed.certificates == self.certificates,
1521 "replaying generate_requests into a fresh state must reproduce self"
1522 );
1523 }
1524
1525 v
1526 }
1527
1528 pub fn generate_activate_requests(&self) -> Vec<Request> {
1529 let mut v: Vec<Request> = Vec::new();
1530 for front in self
1531 .http_listeners
1532 .iter()
1533 .filter(|(_, listener)| listener.active)
1534 .map(|(k, _)| k)
1535 {
1536 v.push(
1537 RequestType::ActivateListener(ActivateListener {
1538 address: SocketAddress::from(*front),
1539 proxy: ListenerType::Http.into(),
1540 from_scm: false,
1541 })
1542 .into(),
1543 );
1544 }
1545
1546 for front in self
1547 .https_listeners
1548 .iter()
1549 .filter(|(_, listener)| listener.active)
1550 .map(|(k, _)| k)
1551 {
1552 v.push(
1553 RequestType::ActivateListener(ActivateListener {
1554 address: SocketAddress::from(*front),
1555 proxy: ListenerType::Https.into(),
1556 from_scm: false,
1557 })
1558 .into(),
1559 );
1560 }
1561 for front in self
1562 .tcp_listeners
1563 .iter()
1564 .filter(|(_, listener)| listener.active)
1565 .map(|(k, _)| k)
1566 {
1567 v.push(
1568 RequestType::ActivateListener(ActivateListener {
1569 address: SocketAddress::from(*front),
1570 proxy: ListenerType::Tcp.into(),
1571 from_scm: false,
1572 })
1573 .into(),
1574 );
1575 }
1576 for front in self
1577 .udp_listeners
1578 .iter()
1579 .filter(|(_, listener)| listener.active)
1580 .map(|(k, _)| k)
1581 {
1582 v.push(
1583 RequestType::ActivateListener(ActivateListener {
1584 address: SocketAddress::from(*front),
1585 proxy: ListenerType::Udp.into(),
1586 from_scm: false,
1587 })
1588 .into(),
1589 );
1590 }
1591
1592 #[cfg(debug_assertions)]
1595 {
1596 let active_listeners = self.http_listeners.values().filter(|l| l.active).count()
1597 + self.https_listeners.values().filter(|l| l.active).count()
1598 + self.tcp_listeners.values().filter(|l| l.active).count()
1599 + self.udp_listeners.values().filter(|l| l.active).count();
1600 debug_assert_eq!(
1601 v.len(),
1602 active_listeners,
1603 "generate_activate_requests emits one request per active listener"
1604 );
1605 debug_assert!(
1606 v.iter()
1607 .all(|r| matches!(r.request_type, Some(RequestType::ActivateListener(_)))),
1608 "generate_activate_requests must emit only ActivateListener requests"
1609 );
1610 }
1611
1612 v
1613 }
1614
1615 pub fn diff(&self, other: &ConfigState) -> Vec<Request> {
1616 let my_tcp_listeners: HashSet<&SocketAddr> = self.tcp_listeners.keys().collect();
1618 let their_tcp_listeners: HashSet<&SocketAddr> = other.tcp_listeners.keys().collect();
1619 let removed_tcp_listeners = my_tcp_listeners.difference(&their_tcp_listeners);
1620 let added_tcp_listeners = their_tcp_listeners.difference(&my_tcp_listeners);
1621
1622 let my_udp_listeners: HashSet<&SocketAddr> = self.udp_listeners.keys().collect();
1623 let their_udp_listeners: HashSet<&SocketAddr> = other.udp_listeners.keys().collect();
1624 let removed_udp_listeners = my_udp_listeners.difference(&their_udp_listeners);
1625 let added_udp_listeners = their_udp_listeners.difference(&my_udp_listeners);
1626
1627 let my_http_listeners: HashSet<&SocketAddr> = self.http_listeners.keys().collect();
1628 let their_http_listeners: HashSet<&SocketAddr> = other.http_listeners.keys().collect();
1629 let removed_http_listeners = my_http_listeners.difference(&their_http_listeners);
1630 let added_http_listeners = their_http_listeners.difference(&my_http_listeners);
1631
1632 let my_https_listeners: HashSet<&SocketAddr> = self.https_listeners.keys().collect();
1633 let their_https_listeners: HashSet<&SocketAddr> = other.https_listeners.keys().collect();
1634 let removed_https_listeners = my_https_listeners.difference(&their_https_listeners);
1635 let added_https_listeners = their_https_listeners.difference(&my_https_listeners);
1636
1637 let mut v: Vec<Request> = vec![];
1638
1639 for address in removed_tcp_listeners {
1640 if self.tcp_listeners[*address].active {
1641 v.push(
1642 RequestType::DeactivateListener(DeactivateListener {
1643 address: SocketAddress::from(**address),
1644 proxy: ListenerType::Tcp.into(),
1645 to_scm: false,
1646 })
1647 .into(),
1648 );
1649 }
1650
1651 v.push(
1652 RequestType::RemoveListener(RemoveListener {
1653 address: SocketAddress::from(**address),
1654 proxy: ListenerType::Tcp.into(),
1655 })
1656 .into(),
1657 );
1658 }
1659
1660 for address in added_tcp_listeners.clone() {
1661 v.push(RequestType::AddTcpListener(other.tcp_listeners[*address]).into());
1662
1663 if other.tcp_listeners[*address].active {
1664 v.push(
1665 RequestType::ActivateListener(ActivateListener {
1666 address: SocketAddress::from(**address),
1667 proxy: ListenerType::Tcp.into(),
1668 from_scm: false,
1669 })
1670 .into(),
1671 );
1672 }
1673 }
1674
1675 for address in removed_udp_listeners {
1676 if self.udp_listeners[*address].active {
1677 v.push(
1678 RequestType::DeactivateListener(DeactivateListener {
1679 address: SocketAddress::from(**address),
1680 proxy: ListenerType::Udp.into(),
1681 to_scm: false,
1682 })
1683 .into(),
1684 );
1685 }
1686
1687 v.push(
1688 RequestType::RemoveListener(RemoveListener {
1689 address: SocketAddress::from(**address),
1690 proxy: ListenerType::Udp.into(),
1691 })
1692 .into(),
1693 );
1694 }
1695
1696 for address in added_udp_listeners.clone() {
1697 v.push(RequestType::AddUdpListener(other.udp_listeners[*address]).into());
1698
1699 if other.udp_listeners[*address].active {
1700 v.push(
1701 RequestType::ActivateListener(ActivateListener {
1702 address: SocketAddress::from(**address),
1703 proxy: ListenerType::Udp.into(),
1704 from_scm: false,
1705 })
1706 .into(),
1707 );
1708 }
1709 }
1710
1711 for address in removed_http_listeners {
1712 if self.http_listeners[*address].active {
1713 v.push(
1714 RequestType::DeactivateListener(DeactivateListener {
1715 address: SocketAddress::from(**address),
1716 proxy: ListenerType::Http.into(),
1717 to_scm: false,
1718 })
1719 .into(),
1720 );
1721 }
1722
1723 v.push(
1724 RequestType::RemoveListener(RemoveListener {
1725 address: SocketAddress::from(**address),
1726 proxy: ListenerType::Http.into(),
1727 })
1728 .into(),
1729 );
1730 }
1731
1732 for address in added_http_listeners.clone() {
1733 v.push(RequestType::AddHttpListener(other.http_listeners[*address].clone()).into());
1734
1735 if other.http_listeners[*address].active {
1736 v.push(
1737 RequestType::ActivateListener(ActivateListener {
1738 address: SocketAddress::from(**address),
1739 proxy: ListenerType::Http.into(),
1740 from_scm: false,
1741 })
1742 .into(),
1743 );
1744 }
1745 }
1746
1747 for address in removed_https_listeners {
1748 if self.https_listeners[*address].active {
1749 v.push(
1750 RequestType::DeactivateListener(DeactivateListener {
1751 address: SocketAddress::from(**address),
1752 proxy: ListenerType::Https.into(),
1753 to_scm: false,
1754 })
1755 .into(),
1756 );
1757 }
1758
1759 v.push(
1760 RequestType::RemoveListener(RemoveListener {
1761 address: SocketAddress::from(**address),
1762 proxy: ListenerType::Https.into(),
1763 })
1764 .into(),
1765 );
1766 }
1767
1768 for address in added_https_listeners.clone() {
1769 v.push(RequestType::AddHttpsListener(other.https_listeners[*address].clone()).into());
1770
1771 if other.https_listeners[*address].active {
1772 v.push(
1773 RequestType::ActivateListener(ActivateListener {
1774 address: SocketAddress::from(**address),
1775 proxy: ListenerType::Https.into(),
1776 from_scm: false,
1777 })
1778 .into(),
1779 );
1780 }
1781 }
1782
1783 for addr in my_tcp_listeners.intersection(&their_tcp_listeners) {
1784 let my_listener = &self.tcp_listeners[*addr];
1785 let their_listener = &other.tcp_listeners[*addr];
1786
1787 if my_listener != their_listener {
1788 v.push(
1789 RequestType::RemoveListener(RemoveListener {
1790 address: SocketAddress::from(**addr),
1791 proxy: ListenerType::Tcp.into(),
1792 })
1793 .into(),
1794 );
1795 let mut listener_to_add = *their_listener;
1797 listener_to_add.active = false;
1798 v.push(RequestType::AddTcpListener(listener_to_add).into());
1799
1800 if their_listener.active {
1807 v.push(
1808 RequestType::ActivateListener(ActivateListener {
1809 address: SocketAddress::from(**addr),
1810 proxy: ListenerType::Tcp.into(),
1811 from_scm: false,
1812 })
1813 .into(),
1814 );
1815 }
1816 }
1817
1818 if my_listener.active && !their_listener.active {
1819 v.push(
1820 RequestType::DeactivateListener(DeactivateListener {
1821 address: SocketAddress::from(**addr),
1822 proxy: ListenerType::Tcp.into(),
1823 to_scm: false,
1824 })
1825 .into(),
1826 );
1827 }
1828 }
1829
1830 for addr in my_udp_listeners.intersection(&their_udp_listeners) {
1831 let my_listener = &self.udp_listeners[*addr];
1832 let their_listener = &other.udp_listeners[*addr];
1833
1834 if my_listener != their_listener {
1835 v.push(
1836 RequestType::RemoveListener(RemoveListener {
1837 address: SocketAddress::from(**addr),
1838 proxy: ListenerType::Udp.into(),
1839 })
1840 .into(),
1841 );
1842 let mut listener_to_add = *their_listener;
1844 listener_to_add.active = false;
1845 v.push(RequestType::AddUdpListener(listener_to_add).into());
1846
1847 if their_listener.active {
1854 v.push(
1855 RequestType::ActivateListener(ActivateListener {
1856 address: SocketAddress::from(**addr),
1857 proxy: ListenerType::Udp.into(),
1858 from_scm: false,
1859 })
1860 .into(),
1861 );
1862 }
1863 }
1864
1865 if my_listener.active && !their_listener.active {
1866 v.push(
1867 RequestType::DeactivateListener(DeactivateListener {
1868 address: SocketAddress::from(**addr),
1869 proxy: ListenerType::Udp.into(),
1870 to_scm: false,
1871 })
1872 .into(),
1873 );
1874 }
1875 }
1876
1877 for addr in my_http_listeners.intersection(&their_http_listeners) {
1878 let my_listener = &self.http_listeners[*addr];
1879 let their_listener = &other.http_listeners[*addr];
1880
1881 if my_listener != their_listener {
1882 v.push(
1883 RequestType::RemoveListener(RemoveListener {
1884 address: SocketAddress::from(**addr),
1885 proxy: ListenerType::Http.into(),
1886 })
1887 .into(),
1888 );
1889 let mut listener_to_add = their_listener.clone();
1891 listener_to_add.active = false;
1892 v.push(RequestType::AddHttpListener(listener_to_add).into());
1893
1894 if their_listener.active {
1901 v.push(
1902 RequestType::ActivateListener(ActivateListener {
1903 address: SocketAddress::from(**addr),
1904 proxy: ListenerType::Http.into(),
1905 from_scm: false,
1906 })
1907 .into(),
1908 );
1909 }
1910 }
1911
1912 if my_listener.active && !their_listener.active {
1913 v.push(
1914 RequestType::DeactivateListener(DeactivateListener {
1915 address: SocketAddress::from(**addr),
1916 proxy: ListenerType::Http.into(),
1917 to_scm: false,
1918 })
1919 .into(),
1920 );
1921 }
1922 }
1923
1924 for addr in my_https_listeners.intersection(&their_https_listeners) {
1925 let my_listener = &self.https_listeners[*addr];
1926 let their_listener = &other.https_listeners[*addr];
1927
1928 if my_listener != their_listener {
1929 v.push(
1930 RequestType::RemoveListener(RemoveListener {
1931 address: SocketAddress::from(**addr),
1932 proxy: ListenerType::Https.into(),
1933 })
1934 .into(),
1935 );
1936 let mut listener_to_add = their_listener.clone();
1938 listener_to_add.active = false;
1939 v.push(RequestType::AddHttpsListener(listener_to_add).into());
1940
1941 if their_listener.active {
1948 v.push(
1949 RequestType::ActivateListener(ActivateListener {
1950 address: SocketAddress::from(**addr),
1951 proxy: ListenerType::Https.into(),
1952 from_scm: false,
1953 })
1954 .into(),
1955 );
1956 }
1957 }
1958
1959 if my_listener.active && !their_listener.active {
1960 v.push(
1961 RequestType::DeactivateListener(DeactivateListener {
1962 address: SocketAddress::from(**addr),
1963 proxy: ListenerType::Https.into(),
1964 to_scm: false,
1965 })
1966 .into(),
1967 );
1968 }
1969 }
1970
1971 for (cluster_id, res) in diff_map(self.clusters.iter(), other.clusters.iter()) {
1972 match res {
1973 DiffResult::Added | DiffResult::Changed => v.push(
1974 RequestType::AddCluster(other.clusters.get(cluster_id).unwrap().clone()).into(),
1975 ),
1976 DiffResult::Removed => {
1977 v.push(RequestType::RemoveCluster(cluster_id.to_string()).into())
1978 }
1979 }
1980 }
1981
1982 for ((cluster_id, backend_id), res) in diff_map(
1983 self.backends.iter().flat_map(|(cluster_id, v)| {
1984 v.iter()
1985 .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1986 }),
1987 other.backends.iter().flat_map(|(cluster_id, v)| {
1988 v.iter()
1989 .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1990 }),
1991 ) {
1992 match res {
1993 DiffResult::Added => {
1994 let backend = other
1995 .backends
1996 .get(cluster_id)
1997 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1998 .unwrap();
1999 v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
2000 }
2001 DiffResult::Removed => {
2002 let backend = self
2003 .backends
2004 .get(cluster_id)
2005 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
2006 .unwrap();
2007
2008 v.push(
2009 RequestType::RemoveBackend(RemoveBackend {
2010 cluster_id: backend.cluster_id.clone(),
2011 backend_id: backend.backend_id.clone(),
2012 address: SocketAddress::from(backend.address),
2013 })
2014 .into(),
2015 );
2016 }
2017 DiffResult::Changed => {
2018 let backend = self
2019 .backends
2020 .get(cluster_id)
2021 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
2022 .unwrap();
2023
2024 v.push(
2025 RequestType::RemoveBackend(RemoveBackend {
2026 cluster_id: backend.cluster_id.clone(),
2027 backend_id: backend.backend_id.clone(),
2028 address: SocketAddress::from(backend.address),
2029 })
2030 .into(),
2031 );
2032
2033 let backend = other
2034 .backends
2035 .get(cluster_id)
2036 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
2037 .unwrap();
2038 v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
2039 }
2040 }
2041 }
2042
2043 let mut my_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
2044 for (route, front) in self.http_fronts.iter() {
2045 my_http_fronts.insert((route, front));
2046 }
2047 let mut their_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
2048 for (route, front) in other.http_fronts.iter() {
2049 their_http_fronts.insert((route, front));
2050 }
2051
2052 let removed_http_fronts = my_http_fronts.difference(&their_http_fronts);
2053 let added_http_fronts = their_http_fronts.difference(&my_http_fronts);
2054
2055 for &(_, front) in removed_http_fronts {
2056 v.push(RequestType::RemoveHttpFrontend(front.clone().into()).into());
2057 }
2058
2059 for &(_, front) in added_http_fronts {
2060 v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
2061 }
2062
2063 let mut my_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
2064 for (route, front) in self.https_fronts.iter() {
2065 my_https_fronts.insert((route, front));
2066 }
2067 let mut their_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
2068 for (route, front) in other.https_fronts.iter() {
2069 their_https_fronts.insert((route, front));
2070 }
2071 let removed_https_fronts = my_https_fronts.difference(&their_https_fronts);
2072 let added_https_fronts = their_https_fronts.difference(&my_https_fronts);
2073
2074 for &(_, front) in removed_https_fronts {
2075 v.push(RequestType::RemoveHttpsFrontend(front.clone().into()).into());
2076 }
2077
2078 for &(_, front) in added_https_fronts {
2079 v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
2080 }
2081
2082 let mut my_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
2083 for (cluster_id, front_list) in self.tcp_fronts.iter() {
2084 for front in front_list.iter() {
2085 my_tcp_fronts.insert((cluster_id, front));
2086 }
2087 }
2088 let mut their_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
2089 for (cluster_id, front_list) in other.tcp_fronts.iter() {
2090 for front in front_list.iter() {
2091 their_tcp_fronts.insert((cluster_id, front));
2092 }
2093 }
2094
2095 let removed_tcp_fronts = my_tcp_fronts.difference(&their_tcp_fronts);
2096 let added_tcp_fronts = their_tcp_fronts.difference(&my_tcp_fronts);
2097
2098 for &(_, front) in removed_tcp_fronts {
2099 v.push(RequestType::RemoveTcpFrontend(front.clone().into()).into());
2100 }
2101
2102 for &(_, front) in added_tcp_fronts {
2103 v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
2104 }
2105
2106 let mut my_udp_fronts: HashSet<(&ClusterId, &UdpFrontend)> = HashSet::new();
2107 for (cluster_id, front_list) in self.udp_fronts.iter() {
2108 for front in front_list.iter() {
2109 my_udp_fronts.insert((cluster_id, front));
2110 }
2111 }
2112 let mut their_udp_fronts: HashSet<(&ClusterId, &UdpFrontend)> = HashSet::new();
2113 for (cluster_id, front_list) in other.udp_fronts.iter() {
2114 for front in front_list.iter() {
2115 their_udp_fronts.insert((cluster_id, front));
2116 }
2117 }
2118
2119 let removed_udp_fronts = my_udp_fronts.difference(&their_udp_fronts);
2120 let added_udp_fronts = their_udp_fronts.difference(&my_udp_fronts);
2121
2122 for &(_, front) in removed_udp_fronts {
2123 v.push(RequestType::RemoveUdpFrontend(front.clone().into()).into());
2124 }
2125
2126 for &(_, front) in added_udp_fronts {
2127 v.push(RequestType::AddUdpFrontend(front.clone().into()).into());
2128 }
2129
2130 let my_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
2132 self.certificates
2133 .iter()
2134 .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
2135 );
2136 let their_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
2137 other
2138 .certificates
2139 .iter()
2140 .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
2141 );
2142
2143 let removed_certificates = my_certificates.difference(&their_certificates);
2144 let added_certificates = their_certificates.difference(&my_certificates);
2145
2146 for &(address, fingerprint) in removed_certificates {
2147 v.push(
2148 RequestType::RemoveCertificate(RemoveCertificate {
2149 address: SocketAddress::from(address),
2150 fingerprint: fingerprint.to_string(),
2151 })
2152 .into(),
2153 );
2154 }
2155
2156 for &(address, fingerprint) in added_certificates {
2157 if let Some(certificate_and_key) = other
2158 .certificates
2159 .get(&address)
2160 .and_then(|certs| certs.get(fingerprint))
2161 {
2162 v.push(
2163 RequestType::AddCertificate(AddCertificate {
2164 address: SocketAddress::from(address),
2165 certificate: certificate_and_key.clone(),
2166 expired_at: None,
2167 })
2168 .into(),
2169 );
2170 }
2171 }
2172
2173 for address in added_tcp_listeners {
2174 let listener = &other.tcp_listeners[*address];
2175 if listener.active {
2176 v.push(
2177 RequestType::ActivateListener(ActivateListener {
2178 address: listener.address,
2179 proxy: ListenerType::Tcp.into(),
2180 from_scm: false,
2181 })
2182 .into(),
2183 );
2184 }
2185 }
2186
2187 for address in added_udp_listeners {
2188 let listener = &other.udp_listeners[*address];
2189 if listener.active {
2190 v.push(
2191 RequestType::ActivateListener(ActivateListener {
2192 address: listener.address,
2193 proxy: ListenerType::Udp.into(),
2194 from_scm: false,
2195 })
2196 .into(),
2197 );
2198 }
2199 }
2200
2201 #[cfg(debug_assertions)]
2214 {
2215 let mut replayed = self.clone();
2216 for request in &v {
2217 debug_assert!(
2219 replayed.dispatch(request).is_ok(),
2220 "every request emitted by diff must replay cleanly onto self"
2221 );
2222 }
2223 let nonempty = |m: &BTreeMap<ClusterId, Vec<Backend>>| {
2224 m.iter()
2225 .filter(|(_, v)| !v.is_empty())
2226 .map(|(k, v)| (k.clone(), v.clone()))
2227 .collect::<BTreeMap<_, _>>()
2228 };
2229 let nonempty_tcp = |m: &HashMap<ClusterId, Vec<TcpFrontend>>| {
2230 m.iter()
2231 .filter(|(_, v)| !v.is_empty())
2232 .map(|(k, v)| (k.clone(), v.clone()))
2233 .collect::<HashMap<_, _>>()
2234 };
2235 debug_assert!(
2236 replayed.clusters == other.clusters
2237 && nonempty(&replayed.backends) == nonempty(&other.backends)
2238 && replayed.http_fronts == other.http_fronts
2239 && replayed.https_fronts == other.https_fronts
2240 && nonempty_tcp(&replayed.tcp_fronts) == nonempty_tcp(&other.tcp_fronts)
2241 && replayed.certificates == other.certificates
2242 && replayed.http_listeners == other.http_listeners
2243 && replayed.https_listeners == other.https_listeners
2244 && replayed.tcp_listeners == other.tcp_listeners
2245 && replayed.udp_listeners == other.udp_listeners,
2246 "replaying diff(self, other) onto self must reproduce other's clusters/backends/frontends/certificates/listeners"
2247 );
2248 }
2249
2250 v
2251 }
2252
2253 pub fn hash_state(&self) -> BTreeMap<ClusterId, u64> {
2255 let mut hm: HashMap<ClusterId, DefaultHasher> = self
2256 .clusters
2257 .keys()
2258 .map(|cluster_id| {
2259 let mut hasher = DefaultHasher::new();
2260 self.clusters.get(cluster_id).hash(&mut hasher);
2261 if let Some(backends) = self.backends.get(cluster_id) {
2262 backends.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
2263 }
2264 if let Some(tcp_fronts) = self.tcp_fronts.get(cluster_id) {
2265 tcp_fronts.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
2266 }
2267 (cluster_id.to_owned(), hasher)
2268 })
2269 .collect();
2270
2271 for front in self.http_fronts.values() {
2272 if let Some(cluster_id) = &front.cluster_id {
2273 if let Some(hasher) = hm.get_mut(cluster_id) {
2274 front.hash(hasher);
2275 }
2276 }
2277 }
2278
2279 for front in self.https_fronts.values() {
2280 if let Some(cluster_id) = &front.cluster_id {
2281 if let Some(hasher) = hm.get_mut(cluster_id) {
2282 front.hash(hasher);
2283 }
2284 }
2285 }
2286
2287 hm.drain()
2288 .map(|(cluster_id, hasher)| (cluster_id, hasher.finish()))
2289 .collect()
2290 }
2291
2292 pub fn cluster_state(&self, cluster_id: &str) -> Option<ClusterInformation> {
2295 let configuration = self.clusters.get(cluster_id).cloned()?;
2296 info!("{:?}", configuration);
2297
2298 let http_frontends: Vec<RequestHttpFrontend> = self
2299 .http_fronts
2300 .values()
2301 .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
2302 .map(|front| front.clone().into())
2303 .collect();
2304
2305 let https_frontends: Vec<RequestHttpFrontend> = self
2306 .https_fronts
2307 .values()
2308 .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
2309 .map(|front| front.clone().into())
2310 .collect();
2311
2312 let tcp_frontends: Vec<RequestTcpFrontend> = self
2313 .tcp_fronts
2314 .get(cluster_id)
2315 .cloned()
2316 .unwrap_or_default()
2317 .iter()
2318 .map(|front| front.clone().into())
2319 .collect();
2320
2321 let udp_frontends: Vec<RequestUdpFrontend> = self
2322 .udp_fronts
2323 .get(cluster_id)
2324 .cloned()
2325 .unwrap_or_default()
2326 .iter()
2327 .map(|front| front.clone().into())
2328 .collect();
2329
2330 let backends: Vec<AddBackend> = self
2331 .backends
2332 .get(cluster_id)
2333 .cloned()
2334 .unwrap_or_default()
2335 .iter()
2336 .map(|backend| backend.clone().into())
2337 .collect();
2338
2339 Some(ClusterInformation {
2340 configuration: Some(configuration),
2341 http_frontends,
2342 https_frontends,
2343 tcp_frontends,
2344 backends,
2345 udp_frontends,
2346 })
2347 }
2348
2349 pub fn count_backends(&self) -> usize {
2350 self.backends.values().fold(0, |acc, v| acc + v.len())
2351 }
2352
2353 pub fn count_frontends(&self) -> usize {
2354 self.http_fronts.values().count()
2355 + self.https_fronts.values().count()
2356 + self.tcp_fronts.values().fold(0, |acc, v| acc + v.len())
2357 + self.udp_fronts.values().fold(0, |acc, v| acc + v.len())
2358 }
2359
2360 pub fn get_cluster_ids_by_domain(
2361 &self,
2362 hostname: String,
2363 path: Option<String>,
2364 ) -> HashSet<ClusterId> {
2365 let mut cluster_ids: HashSet<ClusterId> = HashSet::new();
2366
2367 self.http_fronts.values().for_each(|front| {
2368 if domain_check(&front.hostname, &front.path, &hostname, &path) {
2369 if let Some(id) = &front.cluster_id {
2370 cluster_ids.insert(id.to_string());
2371 }
2372 }
2373 });
2374
2375 self.https_fronts.values().for_each(|front| {
2376 if domain_check(&front.hostname, &front.path, &hostname, &path) {
2377 if let Some(id) = &front.cluster_id {
2378 cluster_ids.insert(id.to_string());
2379 }
2380 }
2381 });
2382
2383 cluster_ids
2384 }
2385
2386 pub fn get_certificates(
2387 &self,
2388 filters: QueryCertificatesFilters,
2389 ) -> BTreeMap<String, CertificateAndKey> {
2390 self.certificates
2391 .values()
2392 .flat_map(|hash_map| hash_map.iter())
2393 .filter(|(fingerprint, cert)| {
2394 if let Some(domain) = &filters.domain {
2395 cert.names.contains(domain)
2396 } else if let Some(f) = &filters.fingerprint {
2397 fingerprint.to_string() == *f
2398 } else {
2399 true
2400 }
2401 })
2402 .map(|(fingerprint, cert)| (fingerprint.to_string(), cert.to_owned()))
2403 .collect()
2404 }
2405
2406 pub fn list_frontends(&self, filters: FrontendFilters) -> ListedFrontends {
2407 let list_all = !filters.http && !filters.https && !filters.tcp;
2409
2410 let mut listed_frontends = ListedFrontends::default();
2411
2412 if filters.http || list_all {
2413 for http_frontend in self.http_fronts.iter().filter(|f| {
2414 if let Some(domain) = &filters.domain {
2415 f.1.hostname.contains(domain)
2416 } else {
2417 true
2418 }
2419 }) {
2420 listed_frontends
2421 .http_frontends
2422 .push(http_frontend.1.to_owned().into());
2423 }
2424 }
2425
2426 if filters.https || list_all {
2427 for https_frontend in self.https_fronts.iter().filter(|f| {
2428 if let Some(domain) = &filters.domain {
2429 f.1.hostname.contains(domain)
2430 } else {
2431 true
2432 }
2433 }) {
2434 listed_frontends
2435 .https_frontends
2436 .push(https_frontend.1.to_owned().into());
2437 }
2438 }
2439
2440 if (filters.tcp || list_all) && filters.domain.is_none() {
2441 for tcp_frontend in self.tcp_fronts.values().flat_map(|v| v.iter()) {
2442 listed_frontends
2443 .tcp_frontends
2444 .push(tcp_frontend.to_owned().into())
2445 }
2446 }
2447
2448 if (filters.tcp || list_all) && filters.domain.is_none() {
2454 for udp_frontend in self.udp_fronts.values().flat_map(|v| v.iter()) {
2455 listed_frontends
2456 .udp_frontends
2457 .push(udp_frontend.to_owned().into())
2458 }
2459 }
2460
2461 listed_frontends
2462 }
2463
2464 pub fn list_listeners(&self) -> ListenersList {
2465 ListenersList {
2466 http_listeners: self
2467 .http_listeners
2468 .iter()
2469 .map(|(addr, listener)| (addr.to_string(), listener.clone()))
2470 .collect(),
2471 https_listeners: self
2472 .https_listeners
2473 .iter()
2474 .map(|(addr, listener)| (addr.to_string(), listener.clone()))
2475 .collect(),
2476 tcp_listeners: self
2477 .tcp_listeners
2478 .iter()
2479 .map(|(addr, listener)| (addr.to_string(), *listener))
2480 .collect(),
2481 udp_listeners: self
2482 .udp_listeners
2483 .iter()
2484 .map(|(addr, listener)| (addr.to_string(), *listener))
2485 .collect(),
2486 }
2487 }
2488
2489 pub fn produce_initial_state(&self) -> InitialState {
2491 let mut worker_requests = Vec::new();
2492 for (counter, request) in self.generate_requests().into_iter().enumerate() {
2493 worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
2494 }
2495 InitialState {
2496 requests: worker_requests,
2497 }
2498 }
2499
2500 pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
2503 let initial_state = self.produce_initial_state();
2504 let count = initial_state.requests.len();
2505
2506 let bytes_to_write = initial_state.encode_to_vec();
2507 println!("writing {} in the temp file", bytes_to_write.len());
2508 file.write_all(&bytes_to_write)
2509 .map_err(StateError::FileError)?;
2510
2511 file.sync_all().map_err(StateError::FileError)?;
2512
2513 Ok(count)
2514 }
2515
2516 pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
2520 let mut counter = 0usize;
2521 let requests = self.generate_requests();
2522
2523 for request in requests {
2524 let message = WorkerRequest::new(format!("SAVE-{counter}"), request);
2525
2526 file.write_all(
2527 &serde_json::to_string(&message)
2528 .map(|s| s.into_bytes())
2529 .unwrap_or_default(),
2530 )
2531 .map_err(StateError::FileError)?;
2532
2533 file.write_all(&b"\n\0"[..])
2534 .map_err(StateError::FileError)?;
2535
2536 if counter % 1000 == 0 {
2537 info!("writing {} commands to file", counter);
2538 file.sync_all().map_err(StateError::FileError)?;
2539 }
2540 counter += 1;
2541 }
2542 file.sync_all().map_err(StateError::FileError)?;
2543
2544 Ok(counter)
2545 }
2546}
2547
2548pub fn validate_h2_flood_knobs_http(patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
2562 macro_rules! require_ge1 {
2563 ($field:expr, $name:literal) => {
2564 if let Some(0) = $field {
2565 return Err(StateError::InvalidValue {
2566 field: $name,
2567 reason: "must be >= 1",
2568 });
2569 }
2570 };
2571 }
2572 require_ge1!(
2573 patch.h2_max_rst_stream_per_window,
2574 "h2_max_rst_stream_per_window"
2575 );
2576 require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
2577 require_ge1!(
2578 patch.h2_max_settings_per_window,
2579 "h2_max_settings_per_window"
2580 );
2581 require_ge1!(
2582 patch.h2_max_empty_data_per_window,
2583 "h2_max_empty_data_per_window"
2584 );
2585 require_ge1!(
2586 patch.h2_max_continuation_frames,
2587 "h2_max_continuation_frames"
2588 );
2589 require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
2590 require_ge1!(
2591 patch.h2_max_window_update_stream0_per_window,
2592 "h2_max_window_update_stream0_per_window"
2593 );
2594 require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
2595 if let Some(v) = patch.h2_stream_shrink_ratio {
2598 if v < 2 {
2599 return Err(StateError::InvalidValue {
2600 field: "h2_stream_shrink_ratio",
2601 reason: "must be >= 2",
2602 });
2603 }
2604 }
2605 require_ge1!(
2608 patch.h2_max_rst_stream_lifetime,
2609 "h2_max_rst_stream_lifetime"
2610 );
2611 require_ge1!(
2612 patch.h2_max_rst_stream_abusive_lifetime,
2613 "h2_max_rst_stream_abusive_lifetime"
2614 );
2615 require_ge1!(
2616 patch.h2_max_rst_stream_emitted_lifetime,
2617 "h2_max_rst_stream_emitted_lifetime"
2618 );
2619 require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
2620 require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
2621 require_ge1!(patch.h2_max_header_fields, "h2_max_header_fields");
2622 Ok(())
2623}
2624
2625pub fn validate_h2_flood_knobs_https(patch: &UpdateHttpsListenerConfig) -> Result<(), StateError> {
2627 macro_rules! require_ge1 {
2628 ($field:expr, $name:literal) => {
2629 if let Some(0) = $field {
2630 return Err(StateError::InvalidValue {
2631 field: $name,
2632 reason: "must be >= 1",
2633 });
2634 }
2635 };
2636 }
2637 require_ge1!(
2638 patch.h2_max_rst_stream_per_window,
2639 "h2_max_rst_stream_per_window"
2640 );
2641 require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
2642 require_ge1!(
2643 patch.h2_max_settings_per_window,
2644 "h2_max_settings_per_window"
2645 );
2646 require_ge1!(
2647 patch.h2_max_empty_data_per_window,
2648 "h2_max_empty_data_per_window"
2649 );
2650 require_ge1!(
2651 patch.h2_max_continuation_frames,
2652 "h2_max_continuation_frames"
2653 );
2654 require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
2655 require_ge1!(
2656 patch.h2_max_window_update_stream0_per_window,
2657 "h2_max_window_update_stream0_per_window"
2658 );
2659 require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
2660 if let Some(v) = patch.h2_stream_shrink_ratio {
2661 if v < 2 {
2662 return Err(StateError::InvalidValue {
2663 field: "h2_stream_shrink_ratio",
2664 reason: "must be >= 2",
2665 });
2666 }
2667 }
2668 require_ge1!(
2669 patch.h2_max_rst_stream_lifetime,
2670 "h2_max_rst_stream_lifetime"
2671 );
2672 require_ge1!(
2673 patch.h2_max_rst_stream_abusive_lifetime,
2674 "h2_max_rst_stream_abusive_lifetime"
2675 );
2676 require_ge1!(
2677 patch.h2_max_rst_stream_emitted_lifetime,
2678 "h2_max_rst_stream_emitted_lifetime"
2679 );
2680 require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
2681 require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
2682 require_ge1!(patch.h2_max_header_fields, "h2_max_header_fields");
2683 Ok(())
2684}
2685
2686pub fn merge_custom_http_answers(
2699 target: &mut Option<CustomHttpAnswers>,
2700 patch: &CustomHttpAnswers,
2701) {
2702 let current = target.get_or_insert_with(CustomHttpAnswers::default);
2703 macro_rules! merge_field {
2704 ($field:ident) => {
2705 if let Some(ref v) = patch.$field {
2706 current.$field = Some(v.clone());
2707 }
2708 };
2709 }
2710 merge_field!(answer_301);
2711 merge_field!(answer_400);
2712 merge_field!(answer_401);
2713 merge_field!(answer_404);
2714 merge_field!(answer_408);
2715 merge_field!(answer_413);
2716 merge_field!(answer_421);
2717 merge_field!(answer_502);
2718 merge_field!(answer_503);
2719 merge_field!(answer_504);
2720 merge_field!(answer_507);
2721}
2722
2723pub fn validate_alpn_protocols(values: &[String]) -> Result<(), StateError> {
2726 for value in values {
2727 if value != "h2" && value != "http/1.1" {
2728 return Err(StateError::InvalidValue {
2729 field: "alpn_protocols",
2730 reason: "each value must be \"h2\" or \"http/1.1\"",
2731 });
2732 }
2733 }
2734 Ok(())
2735}
2736
2737pub fn validate_sozu_id_header(value: &str) -> Result<(), StateError> {
2748 if value.is_empty() {
2749 return Err(StateError::InvalidValue {
2750 field: "sozu_id_header",
2751 reason: "must not be empty",
2752 });
2753 }
2754 for b in value.bytes() {
2755 let is_tchar = b.is_ascii_alphanumeric()
2756 || matches!(
2757 b,
2758 b'!' | b'#'
2759 | b'$'
2760 | b'%'
2761 | b'&'
2762 | b'\''
2763 | b'*'
2764 | b'+'
2765 | b'-'
2766 | b'.'
2767 | b'^'
2768 | b'_'
2769 | b'`'
2770 | b'|'
2771 | b'~'
2772 );
2773 if !is_tchar {
2774 return Err(StateError::InvalidValue {
2775 field: "sozu_id_header",
2776 reason: "must be a valid HTTP header name (RFC 9110 §5.1 token: alphanumeric or one of !#$%&'*+-.^_`|~)",
2777 });
2778 }
2779 }
2780 Ok(())
2781}
2782
2783fn domain_check(
2784 front_hostname: &str,
2785 front_path_rule: &PathRule,
2786 hostname: &str,
2787 path_prefix: &Option<String>,
2788) -> bool {
2789 if hostname != front_hostname {
2790 return false;
2791 }
2792
2793 if let Some(path) = &path_prefix {
2794 return path == &front_path_rule.value;
2795 }
2796
2797 true
2798}
2799
2800struct DiffMap<'a, K: Ord, V, I1, I2> {
2801 my_it: I1,
2802 other_it: I2,
2803 my: Option<(K, &'a V)>,
2804 other: Option<(K, &'a V)>,
2805}
2806
2807fn diff_map<
2809 'a,
2810 K: Ord,
2811 V: PartialEq,
2812 I1: Iterator<Item = (K, &'a V)>,
2813 I2: Iterator<Item = (K, &'a V)>,
2814>(
2815 my: I1,
2816 other: I2,
2817) -> DiffMap<'a, K, V, I1, I2> {
2818 DiffMap {
2819 my_it: my,
2820 other_it: other,
2821 my: None,
2822 other: None,
2823 }
2824}
2825
2826enum DiffResult {
2827 Added,
2828 Removed,
2829 Changed,
2830}
2831
2832impl<'a, K: Ord, V: PartialEq, I1: Iterator<Item = (K, &'a V)>, I2: Iterator<Item = (K, &'a V)>>
2835 std::iter::Iterator for DiffMap<'a, K, V, I1, I2>
2836{
2837 type Item = (K, DiffResult);
2838
2839 fn next(&mut self) -> Option<Self::Item> {
2840 loop {
2841 if self.my.is_none() {
2842 self.my = self.my_it.next();
2843 }
2844 if self.other.is_none() {
2845 self.other = self.other_it.next();
2846 }
2847
2848 match (self.my.take(), self.other.take()) {
2849 (None, other) => return other.map(|(k, _)| (k, DiffResult::Added)),
2853 (Some((k, _)), None) => return Some((k, DiffResult::Removed)),
2856 (Some((k1, _v1)), Some((k2, v2))) if k1 < k2 => {
2858 self.other = Some((k2, v2));
2859 return Some((k1, DiffResult::Removed));
2860 }
2861 (Some((k1, v1)), Some((k2, _v2))) if k1 > k2 => {
2863 self.my = Some((k1, v1));
2864 return Some((k2, DiffResult::Added));
2865 }
2866 (Some((k1, v1)), Some((_k2, v2))) if v1 != v2 => {
2867 return Some((k1, DiffResult::Changed));
2870 }
2871 _ => {}
2872 }
2873 }
2874 }
2875}
2876
2877#[cfg(test)]
2878mod tests {
2879 use rand::{RngExt, rng, seq::SliceRandom};
2880
2881 use super::*;
2882 use crate::proto::command::{
2883 CustomHttpAnswers, LoadBalancingParams, RequestHttpFrontend, RequestTcpFrontend,
2884 RequestUdpFrontend, RulePosition, UdpListenerConfig, UpdateUdpListenerConfig,
2885 };
2886
2887 #[test]
2888 fn serialize() {
2889 let mut state: ConfigState = Default::default();
2890 state
2891 .dispatch(
2892 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2893 cluster_id: Some(String::from("cluster_1")),
2894 hostname: String::from("lolcatho.st:8080"),
2895 path: PathRule::prefix(String::from("/")),
2896 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2897 position: RulePosition::Tree.into(),
2898 ..Default::default()
2899 })
2900 .into(),
2901 )
2902 .expect("Could not execute request");
2903 state
2904 .dispatch(
2905 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2906 cluster_id: Some(String::from("cluster_2")),
2907 hostname: String::from("test.local"),
2908 path: PathRule::prefix(String::from("/abc")),
2909 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2910 position: RulePosition::Pre.into(),
2911 ..Default::default()
2912 })
2913 .into(),
2914 )
2915 .expect("Could not execute request");
2916 state
2917 .dispatch(
2918 &RequestType::AddBackend(AddBackend {
2919 cluster_id: String::from("cluster_1"),
2920 backend_id: String::from("cluster_1-0"),
2921 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2922 ..Default::default()
2923 })
2924 .into(),
2925 )
2926 .expect("Could not execute request");
2927 state
2928 .dispatch(
2929 &RequestType::AddBackend(AddBackend {
2930 cluster_id: String::from("cluster_1"),
2931 backend_id: String::from("cluster_1-1"),
2932 address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
2933 ..Default::default()
2934 })
2935 .into(),
2936 )
2937 .expect("Could not execute request");
2938 state
2939 .dispatch(
2940 &RequestType::AddBackend(AddBackend {
2941 cluster_id: String::from("cluster_2"),
2942 backend_id: String::from("cluster_2-0"),
2943 address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2944 ..Default::default()
2945 })
2946 .into(),
2947 )
2948 .expect("Could not execute request");
2949 state
2950 .dispatch(
2951 &RequestType::AddBackend(AddBackend {
2952 cluster_id: String::from("cluster_1"),
2953 backend_id: String::from("cluster_1-3"),
2954 address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2955 ..Default::default()
2956 })
2957 .into(),
2958 )
2959 .expect("Could not execute request");
2960 state
2961 .dispatch(
2962 &RequestType::RemoveBackend(RemoveBackend {
2963 cluster_id: String::from("cluster_1"),
2964 backend_id: String::from("cluster_1-3"),
2965 address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2966 })
2967 .into(),
2968 )
2969 .expect("Could not execute request");
2970
2971 }
2981
2982 #[test]
2983 fn diff() {
2984 let mut state: ConfigState = Default::default();
2985 state
2986 .dispatch(
2987 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2988 cluster_id: Some(String::from("cluster_1")),
2989 hostname: String::from("lolcatho.st:8080"),
2990 path: PathRule::prefix(String::from("/")),
2991 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2992 position: RulePosition::Post.into(),
2993 ..Default::default()
2994 })
2995 .into(),
2996 )
2997 .expect("Could not execute request");
2998 state
2999 .dispatch(
3000 &RequestType::AddHttpFrontend(RequestHttpFrontend {
3001 cluster_id: Some(String::from("cluster_2")),
3002 hostname: String::from("test.local"),
3003 path: PathRule::prefix(String::from("/abc")),
3004 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3005 ..Default::default()
3006 })
3007 .into(),
3008 )
3009 .expect("Could not execute request");
3010 state
3011 .dispatch(
3012 &RequestType::AddBackend(AddBackend {
3013 cluster_id: String::from("cluster_1"),
3014 backend_id: String::from("cluster_1-0"),
3015 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3016 load_balancing_parameters: Some(LoadBalancingParams::default()),
3017 ..Default::default()
3018 })
3019 .into(),
3020 )
3021 .expect("Could not execute request");
3022 state
3023 .dispatch(
3024 &RequestType::AddBackend(AddBackend {
3025 cluster_id: String::from("cluster_1"),
3026 backend_id: String::from("cluster_1-1"),
3027 address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
3028 load_balancing_parameters: Some(LoadBalancingParams::default()),
3029 ..Default::default()
3030 })
3031 .into(),
3032 )
3033 .expect("Could not execute request");
3034 state
3035 .dispatch(
3036 &RequestType::AddBackend(AddBackend {
3037 cluster_id: String::from("cluster_2"),
3038 backend_id: String::from("cluster_2-0"),
3039 address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
3040 load_balancing_parameters: Some(LoadBalancingParams::default()),
3041 ..Default::default()
3042 })
3043 .into(),
3044 )
3045 .expect("Could not execute request");
3046 state
3047 .dispatch(
3048 &RequestType::AddCluster(Cluster {
3049 cluster_id: String::from("cluster_2"),
3050 sticky_session: true,
3051 https_redirect: true,
3052 ..Default::default()
3053 })
3054 .into(),
3055 )
3056 .expect("Could not execute request");
3057
3058 let mut state2: ConfigState = Default::default();
3059 state2
3060 .dispatch(
3061 &RequestType::AddHttpFrontend(RequestHttpFrontend {
3062 cluster_id: Some(String::from("cluster_1")),
3063 hostname: String::from("lolcatho.st:8080"),
3064 path: PathRule::prefix(String::from("/")),
3065 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3066 position: RulePosition::Post.into(),
3067 ..Default::default()
3068 })
3069 .into(),
3070 )
3071 .expect("Could not execute request");
3072 state2
3073 .dispatch(
3074 &RequestType::AddBackend(AddBackend {
3075 cluster_id: String::from("cluster_1"),
3076 backend_id: String::from("cluster_1-0"),
3077 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3078 load_balancing_parameters: Some(LoadBalancingParams::default()),
3079 ..Default::default()
3080 })
3081 .into(),
3082 )
3083 .expect("Could not execute request");
3084 state2
3085 .dispatch(
3086 &RequestType::AddBackend(AddBackend {
3087 cluster_id: String::from("cluster_1"),
3088 backend_id: String::from("cluster_1-1"),
3089 address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
3090 load_balancing_parameters: Some(LoadBalancingParams::default()),
3091 ..Default::default()
3092 })
3093 .into(),
3094 )
3095 .expect("Could not execute request");
3096 state2
3097 .dispatch(
3098 &RequestType::AddBackend(AddBackend {
3099 cluster_id: String::from("cluster_1"),
3100 backend_id: String::from("cluster_1-2"),
3101 address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
3102 load_balancing_parameters: Some(LoadBalancingParams::default()),
3103 ..Default::default()
3104 })
3105 .into(),
3106 )
3107 .expect("Could not execute request");
3108 state2
3109 .dispatch(
3110 &RequestType::AddCluster(Cluster {
3111 cluster_id: String::from("cluster_3"),
3112 sticky_session: false,
3113 https_redirect: false,
3114 ..Default::default()
3115 })
3116 .into(),
3117 )
3118 .expect("Could not execute request");
3119
3120 let e: Vec<Request> = vec![
3121 RequestType::RemoveHttpFrontend(RequestHttpFrontend {
3122 cluster_id: Some(String::from("cluster_2")),
3123 hostname: String::from("test.local"),
3124 path: PathRule::prefix(String::from("/abc")),
3125 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3126 ..Default::default()
3127 })
3128 .into(),
3129 RequestType::RemoveBackend(RemoveBackend {
3130 cluster_id: String::from("cluster_2"),
3131 backend_id: String::from("cluster_2-0"),
3132 address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
3133 })
3134 .into(),
3135 RequestType::AddBackend(AddBackend {
3136 cluster_id: String::from("cluster_1"),
3137 backend_id: String::from("cluster_1-2"),
3138 address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
3139 load_balancing_parameters: Some(LoadBalancingParams::default()),
3140 ..Default::default()
3141 })
3142 .into(),
3143 RequestType::RemoveCluster(String::from("cluster_2")).into(),
3144 RequestType::AddCluster(Cluster {
3145 cluster_id: String::from("cluster_3"),
3146 sticky_session: false,
3147 https_redirect: false,
3148 ..Default::default()
3149 })
3150 .into(),
3151 ];
3152 let expected_diff: HashSet<&Request> = HashSet::from_iter(e.iter());
3153
3154 let d = state.diff(&state2);
3155 let diff = HashSet::from_iter(d.iter());
3156 println!("diff requests:\n{diff:#?}\n");
3157 println!("expected diff requests:\n{expected_diff:#?}\n");
3158
3159 let hash1 = state.hash_state();
3160 let hash2 = state2.hash_state();
3161 let mut state3 = state.clone();
3162 state3
3163 .dispatch(
3164 &RequestType::AddBackend(AddBackend {
3165 cluster_id: String::from("cluster_1"),
3166 backend_id: String::from("cluster_1-2"),
3167 address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
3168 load_balancing_parameters: Some(LoadBalancingParams::default()),
3169 ..Default::default()
3170 })
3171 .into(),
3172 )
3173 .expect("Could not execute request");
3174 let hash3 = state3.hash_state();
3175 println!("state 1 hashes: {hash1:#?}");
3176 println!("state 2 hashes: {hash2:#?}");
3177 println!("state 3 hashes: {hash3:#?}");
3178
3179 assert_eq!(diff, expected_diff);
3180 }
3181
3182 #[test]
3183 fn cluster_ids_by_domain() {
3184 let mut config = ConfigState::new();
3185 let http_front_cluster1 = RequestHttpFrontend {
3186 cluster_id: Some(String::from("MyCluster_1")),
3187 hostname: String::from("lolcatho.st"),
3188 path: PathRule::prefix(String::from("")),
3189 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3190 ..Default::default()
3191 };
3192
3193 let https_front_cluster1 = RequestHttpFrontend {
3194 cluster_id: Some(String::from("MyCluster_1")),
3195 hostname: String::from("lolcatho.st"),
3196 path: PathRule::prefix(String::from("")),
3197 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3198 ..Default::default()
3199 };
3200
3201 let http_front_cluster2 = RequestHttpFrontend {
3202 cluster_id: Some(String::from("MyCluster_2")),
3203 hostname: String::from("lolcatho.st"),
3204 path: PathRule::prefix(String::from("/api")),
3205 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3206 ..Default::default()
3207 };
3208
3209 let https_front_cluster2 = RequestHttpFrontend {
3210 cluster_id: Some(String::from("MyCluster_2")),
3211 hostname: String::from("lolcatho.st"),
3212 path: PathRule::prefix(String::from("/api")),
3213 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3214 ..Default::default()
3215 };
3216
3217 config
3218 .dispatch(&RequestType::AddHttpFrontend(http_front_cluster1).into())
3219 .expect("Could not execute request");
3220 config
3221 .dispatch(&RequestType::AddHttpFrontend(http_front_cluster2).into())
3222 .expect("Could not execute request");
3223 config
3224 .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster1).into())
3225 .expect("Could not execute request");
3226 config
3227 .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster2).into())
3228 .expect("Could not execute request");
3229
3230 let mut cluster1_cluster2: HashSet<ClusterId> = HashSet::new();
3231 cluster1_cluster2.insert(String::from("MyCluster_1"));
3232 cluster1_cluster2.insert(String::from("MyCluster_2"));
3233
3234 let mut cluster2: HashSet<ClusterId> = HashSet::new();
3235 cluster2.insert(String::from("MyCluster_2"));
3236
3237 let empty: HashSet<ClusterId> = HashSet::new();
3238 assert_eq!(
3239 config.get_cluster_ids_by_domain(String::from("lolcatho.st"), None),
3240 cluster1_cluster2
3241 );
3242 assert_eq!(
3243 config
3244 .get_cluster_ids_by_domain(String::from("lolcatho.st"), Some(String::from("/api"))),
3245 cluster2
3246 );
3247 assert_eq!(
3248 config.get_cluster_ids_by_domain(String::from("lolcathost"), None),
3249 empty
3250 );
3251 assert_eq!(
3252 config
3253 .get_cluster_ids_by_domain(String::from("lolcathost"), Some(String::from("/sozu"))),
3254 empty
3255 );
3256 }
3257
3258 #[test]
3259 fn duplicate_backends() {
3260 let mut state: ConfigState = Default::default();
3261 state
3262 .dispatch(
3263 &RequestType::AddBackend(AddBackend {
3264 cluster_id: String::from("cluster_1"),
3265 backend_id: String::from("cluster_1-0"),
3266 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3267 load_balancing_parameters: Some(LoadBalancingParams::default()),
3268 ..Default::default()
3269 })
3270 .into(),
3271 )
3272 .expect("Could not execute request");
3273
3274 let b = Backend {
3275 cluster_id: String::from("cluster_1"),
3276 backend_id: String::from("cluster_1-0"),
3277 address: "127.0.0.1:1026".parse().unwrap(),
3278 load_balancing_parameters: Some(LoadBalancingParams::default()),
3279 sticky_id: Some("sticky".to_string()),
3280 backup: None,
3281 };
3282
3283 state
3284 .dispatch(&RequestType::AddBackend(b.clone().to_add_backend()).into())
3285 .expect("Could not execute order");
3286
3287 assert_eq!(state.backends.get("cluster_1").unwrap(), &vec![b]);
3288 }
3289
3290 #[test]
3291 fn remove_backend() {
3292 let mut state: ConfigState = Default::default();
3293 state
3294 .dispatch(
3295 &RequestType::AddCluster(Cluster {
3296 cluster_id: String::from("cluster_1"),
3297 ..Default::default()
3298 })
3299 .into(),
3300 )
3301 .expect("Could not execute request");
3302
3303 for i in 0..10 {
3304 state
3305 .dispatch(
3306 &RequestType::AddBackend(AddBackend {
3307 cluster_id: String::from("cluster_1"),
3308 backend_id: format!("cluster_1-{i}"),
3309 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3310 ..Default::default()
3311 })
3312 .into(),
3313 )
3314 .expect("Could not execute request");
3315 }
3316
3317 assert_eq!(state.backends.get("cluster_1").unwrap().len(), 10);
3318
3319 let remove_backend_2 = RequestType::RemoveBackend(RemoveBackend {
3320 cluster_id: String::from("cluster_1"),
3321 backend_id: String::from("cluster_1-0"),
3322 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3323 })
3324 .into();
3325
3326 let remove_backend_result = state.dispatch(&remove_backend_2);
3327
3328 assert!(remove_backend_result.is_ok());
3329 assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
3330
3331 let redundant_remove = state.dispatch(&remove_backend_2);
3332 assert!(matches!(redundant_remove, Err(StateError::NoChange)));
3333 assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
3334 }
3335
3336 #[test]
3337 fn remove_backends_randomly() {
3338 let mut state: ConfigState = Default::default();
3339 state
3340 .dispatch(
3341 &RequestType::AddCluster(Cluster {
3342 cluster_id: String::from("cluster_1"),
3343 ..Default::default()
3344 })
3345 .into(),
3346 )
3347 .expect("Could not execute request");
3348
3349 for _ in 0..1000 {
3350 for i in 0..10 {
3351 state
3352 .dispatch(
3353 &RequestType::AddBackend(AddBackend {
3354 cluster_id: String::from("cluster_1"),
3355 backend_id: format!("cluster_1-{i}"),
3356 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3357 ..Default::default()
3358 })
3359 .into(),
3360 )
3361 .expect("Could not execute request");
3362 }
3363
3364 let mut rng = rng();
3365 let mut indexes = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
3366 indexes.shuffle(&mut rng);
3367 let random_count = rng.random_range(1..indexes.len());
3368 let random_indexes: Vec<i32> = indexes.into_iter().take(random_count).collect();
3369
3370 for j in random_indexes {
3371 let remove_backend_result = state.dispatch(
3372 &RequestType::RemoveBackend(RemoveBackend {
3373 cluster_id: String::from("cluster_1"),
3374 backend_id: format!("cluster_1-{j}"),
3375 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3376 })
3377 .into(),
3378 );
3379 assert!(remove_backend_result.is_ok());
3380 }
3381 }
3382 }
3383
3384 #[test]
3385 fn listener_diff() {
3386 let mut state: ConfigState = Default::default();
3387 let custom_http_answers = Some(CustomHttpAnswers {
3388 answer_404: Some("test".to_string()),
3389 ..Default::default()
3390 });
3391 state
3392 .dispatch(
3393 &RequestType::AddTcpListener(TcpListenerConfig {
3394 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3395 ..Default::default()
3396 })
3397 .into(),
3398 )
3399 .expect("Could not execute request");
3400 state
3401 .dispatch(
3402 &RequestType::ActivateListener(ActivateListener {
3403 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3404 proxy: ListenerType::Tcp.into(),
3405 from_scm: false,
3406 })
3407 .into(),
3408 )
3409 .expect("Could not execute request");
3410 state
3411 .dispatch(
3412 &RequestType::AddHttpListener(HttpListenerConfig {
3413 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3414 ..Default::default()
3415 })
3416 .into(),
3417 )
3418 .expect("Could not execute request");
3419 state
3420 .dispatch(
3421 &RequestType::AddHttpsListener(HttpsListenerConfig {
3422 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3423 ..Default::default()
3424 })
3425 .into(),
3426 )
3427 .expect("Could not execute request");
3428 state
3429 .dispatch(
3430 &RequestType::ActivateListener(ActivateListener {
3431 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3432 proxy: ListenerType::Https.into(),
3433 from_scm: false,
3434 })
3435 .into(),
3436 )
3437 .expect("Could not execute request");
3438
3439 let mut state2: ConfigState = Default::default();
3440 state2
3441 .dispatch(
3442 &RequestType::AddTcpListener(TcpListenerConfig {
3443 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3444 expect_proxy: true,
3445 ..Default::default()
3446 })
3447 .into(),
3448 )
3449 .expect("Could not execute request");
3450 state2
3451 .dispatch(
3452 &RequestType::AddHttpListener(HttpListenerConfig {
3453 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3454 http_answers: custom_http_answers.clone(),
3455 ..Default::default()
3456 })
3457 .into(),
3458 )
3459 .expect("Could not execute request");
3460 state2
3461 .dispatch(
3462 &RequestType::ActivateListener(ActivateListener {
3463 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3464 proxy: ListenerType::Http.into(),
3465 from_scm: false,
3466 })
3467 .into(),
3468 )
3469 .expect("Could not execute request");
3470 state2
3471 .dispatch(
3472 &RequestType::AddHttpsListener(HttpsListenerConfig {
3473 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3474 http_answers: custom_http_answers.clone(),
3475 ..Default::default()
3476 })
3477 .into(),
3478 )
3479 .expect("Could not execute request");
3480 state2
3481 .dispatch(
3482 &RequestType::ActivateListener(ActivateListener {
3483 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3484 proxy: ListenerType::Https.into(),
3485 from_scm: false,
3486 })
3487 .into(),
3488 )
3489 .expect("Could not execute request");
3490
3491 let e: Vec<Request> = vec![
3492 RequestType::RemoveListener(RemoveListener {
3493 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3494 proxy: ListenerType::Tcp.into(),
3495 })
3496 .into(),
3497 RequestType::AddTcpListener(TcpListenerConfig {
3498 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3499 expect_proxy: true,
3500 ..Default::default()
3501 })
3502 .into(),
3503 RequestType::DeactivateListener(DeactivateListener {
3504 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
3505 proxy: ListenerType::Tcp.into(),
3506 to_scm: false,
3507 })
3508 .into(),
3509 RequestType::RemoveListener(RemoveListener {
3510 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3511 proxy: ListenerType::Http.into(),
3512 })
3513 .into(),
3514 RequestType::AddHttpListener(HttpListenerConfig {
3515 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3516 http_answers: custom_http_answers.clone(),
3517 ..Default::default()
3518 })
3519 .into(),
3520 RequestType::ActivateListener(ActivateListener {
3521 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3522 proxy: ListenerType::Http.into(),
3523 from_scm: false,
3524 })
3525 .into(),
3526 RequestType::RemoveListener(RemoveListener {
3527 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3528 proxy: ListenerType::Https.into(),
3529 })
3530 .into(),
3531 RequestType::AddHttpsListener(HttpsListenerConfig {
3532 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3533 http_answers: custom_http_answers.clone(),
3534 ..Default::default()
3535 })
3536 .into(),
3537 RequestType::ActivateListener(ActivateListener {
3543 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3544 proxy: ListenerType::Https.into(),
3545 from_scm: false,
3546 })
3547 .into(),
3548 ];
3549
3550 let diff = state.diff(&state2);
3551 println!("expected diff requests:\n{e:#?}\n");
3553 println!("diff requests:\n{diff:#?}\n");
3554
3555 let _hash1 = state.hash_state();
3556 let _hash2 = state2.hash_state();
3557
3558 assert_eq!(diff, e);
3559
3560 let mut replayed = state.clone();
3567 for request in &diff {
3568 replayed
3569 .dispatch(request)
3570 .expect("every diff request must replay cleanly onto the source state");
3571 }
3572 assert_eq!(
3573 replayed.tcp_listeners, state2.tcp_listeners,
3574 "replayed tcp_listeners must match the target state"
3575 );
3576 assert_eq!(
3577 replayed.http_listeners, state2.http_listeners,
3578 "replayed http_listeners must match the target state"
3579 );
3580 assert_eq!(
3581 replayed.https_listeners, state2.https_listeners,
3582 "replayed https_listeners must match the target state"
3583 );
3584 let replayed_8443 = replayed
3587 .https_listeners
3588 .get(&SocketAddr::from(SocketAddress::new_v4(0, 0, 0, 0, 8443)))
3589 .expect("8443 HTTPS listener must exist after replay");
3590 assert!(
3591 replayed_8443.active,
3592 "the 8443 HTTPS listener must stay ACTIVE across a config change"
3593 );
3594 }
3595
3596 #[test]
3597 fn certificate_retrieval() {
3598 let mut state: ConfigState = Default::default();
3599 let certificate_and_key = CertificateAndKey {
3600 certificate: String::from(include_str!("../assets/certificate.pem")),
3601 key: String::from(include_str!("../assets/key.pem")),
3602 certificate_chain: vec![],
3603 versions: vec![],
3604 names: vec!["lolcatho.st".to_string()],
3605 };
3606 let add_certificate = AddCertificate {
3607 address: SocketAddress::new_v4(127, 0, 0, 1, 8080),
3608 certificate: certificate_and_key,
3609 expired_at: None,
3610 };
3611 state
3612 .dispatch(&RequestType::AddCertificate(add_certificate).into())
3613 .expect("Could not add certificate");
3614
3615 println!("state: {state:#?}");
3616
3617 let certificates_found_by_fingerprint = state.get_certificates(QueryCertificatesFilters {
3623 domain: None,
3624 fingerprint: Some(
3625 "ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5".to_string(),
3626 ),
3627 });
3628
3629 println!("found certificate: {certificates_found_by_fingerprint:#?}");
3630
3631 assert!(!certificates_found_by_fingerprint.is_empty());
3632
3633 let certificate_found_by_domain_name = state.get_certificates(QueryCertificatesFilters {
3634 domain: Some("lolcatho.st".to_string()),
3635 fingerprint: None,
3636 });
3637
3638 assert!(!certificate_found_by_domain_name.is_empty());
3639 }
3640
3641 #[test]
3642 fn count_backends_across_clusters() {
3643 let mut state: ConfigState = Default::default();
3644
3645 assert_eq!(state.count_backends(), 0);
3646
3647 state
3648 .dispatch(
3649 &RequestType::AddBackend(AddBackend {
3650 cluster_id: String::from("cluster_1"),
3651 backend_id: String::from("cluster_1-0"),
3652 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3653 ..Default::default()
3654 })
3655 .into(),
3656 )
3657 .expect("Could not execute request");
3658 assert_eq!(state.count_backends(), 1);
3659
3660 state
3661 .dispatch(
3662 &RequestType::AddBackend(AddBackend {
3663 cluster_id: String::from("cluster_1"),
3664 backend_id: String::from("cluster_1-1"),
3665 address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
3666 ..Default::default()
3667 })
3668 .into(),
3669 )
3670 .expect("Could not execute request");
3671 assert_eq!(state.count_backends(), 2);
3672
3673 state
3675 .dispatch(
3676 &RequestType::AddBackend(AddBackend {
3677 cluster_id: String::from("cluster_2"),
3678 backend_id: String::from("cluster_2-0"),
3679 address: SocketAddress::new_v4(192, 168, 1, 1, 8080),
3680 ..Default::default()
3681 })
3682 .into(),
3683 )
3684 .expect("Could not execute request");
3685 assert_eq!(state.count_backends(), 3);
3686
3687 state
3689 .dispatch(
3690 &RequestType::RemoveBackend(RemoveBackend {
3691 cluster_id: String::from("cluster_1"),
3692 backend_id: String::from("cluster_1-0"),
3693 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
3694 })
3695 .into(),
3696 )
3697 .expect("Could not execute request");
3698 assert_eq!(state.count_backends(), 2);
3699 }
3700
3701 #[test]
3702 fn count_frontends_across_types() {
3703 let mut state: ConfigState = Default::default();
3704
3705 assert_eq!(state.count_frontends(), 0);
3706
3707 state
3709 .dispatch(
3710 &RequestType::AddHttpFrontend(RequestHttpFrontend {
3711 cluster_id: Some(String::from("cluster_1")),
3712 hostname: String::from("example.com"),
3713 path: PathRule::prefix(String::from("/")),
3714 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3715 position: RulePosition::Tree.into(),
3716 ..Default::default()
3717 })
3718 .into(),
3719 )
3720 .expect("Could not execute request");
3721 assert_eq!(state.count_frontends(), 1);
3722
3723 state
3725 .dispatch(
3726 &RequestType::AddHttpsFrontend(RequestHttpFrontend {
3727 cluster_id: Some(String::from("cluster_1")),
3728 hostname: String::from("secure.example.com"),
3729 path: PathRule::prefix(String::from("/")),
3730 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
3731 position: RulePosition::Tree.into(),
3732 ..Default::default()
3733 })
3734 .into(),
3735 )
3736 .expect("Could not execute request");
3737 assert_eq!(state.count_frontends(), 2);
3738
3739 state
3741 .dispatch(
3742 &RequestType::AddTcpFrontend(RequestTcpFrontend {
3743 cluster_id: String::from("cluster_2"),
3744 address: SocketAddress::new_v4(0, 0, 0, 0, 5432),
3745 ..Default::default()
3746 })
3747 .into(),
3748 )
3749 .expect("Could not execute request");
3750 assert_eq!(state.count_frontends(), 3);
3751
3752 state
3754 .dispatch(
3755 &RequestType::AddTcpFrontend(RequestTcpFrontend {
3756 cluster_id: String::from("cluster_2"),
3757 address: SocketAddress::new_v4(0, 0, 0, 0, 5433),
3758 ..Default::default()
3759 })
3760 .into(),
3761 )
3762 .expect("Could not execute request");
3763 assert_eq!(state.count_frontends(), 4);
3764
3765 state
3767 .dispatch(
3768 &RequestType::RemoveHttpFrontend(RequestHttpFrontend {
3769 cluster_id: Some(String::from("cluster_1")),
3770 hostname: String::from("example.com"),
3771 path: PathRule::prefix(String::from("/")),
3772 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
3773 position: RulePosition::Tree.into(),
3774 ..Default::default()
3775 })
3776 .into(),
3777 )
3778 .expect("Could not execute request");
3779 assert_eq!(state.count_frontends(), 3);
3780 }
3781
3782 fn make_https_listener(address: SocketAddress) -> HttpsListenerConfig {
3785 HttpsListenerConfig {
3786 address,
3787 sticky_name: "SOZUBALANCEID".to_owned(),
3788 front_timeout: 60,
3789 back_timeout: 30,
3790 connect_timeout: 3,
3791 request_timeout: 10,
3792 ..Default::default()
3793 }
3794 }
3795
3796 fn make_http_listener(address: SocketAddress) -> HttpListenerConfig {
3797 HttpListenerConfig {
3798 address,
3799 sticky_name: "SOZUBALANCEID".to_owned(),
3800 front_timeout: 60,
3801 back_timeout: 30,
3802 connect_timeout: 3,
3803 request_timeout: 10,
3804 ..Default::default()
3805 }
3806 }
3807
3808 fn make_tcp_listener(address: SocketAddress) -> TcpListenerConfig {
3809 TcpListenerConfig {
3810 address,
3811 front_timeout: 60,
3812 back_timeout: 30,
3813 connect_timeout: 3,
3814 ..Default::default()
3815 }
3816 }
3817
3818 fn make_udp_listener(address: SocketAddress, active: bool) -> UdpListenerConfig {
3819 UdpListenerConfig {
3820 address,
3821 public_address: None,
3822 front_timeout: 30,
3823 back_timeout: 30,
3824 max_rx_datagram_size: 1500,
3825 max_flows: 0,
3826 active,
3827 }
3828 }
3829
3830 #[test]
3840 fn test_udp_state_roundtrip() {
3841 let address = SocketAddress::new_v4(127, 0, 0, 1, 5353);
3842
3843 let mut state = ConfigState::default();
3844 state
3845 .dispatch(&RequestType::AddUdpListener(make_udp_listener(address, true)).into())
3846 .expect("could not add udp listener");
3847 state
3848 .dispatch(
3849 &RequestType::ActivateListener(ActivateListener {
3850 address,
3851 proxy: ListenerType::Udp.into(),
3852 from_scm: false,
3853 })
3854 .into(),
3855 )
3856 .expect("could not activate udp listener");
3857 state
3858 .dispatch(
3859 &RequestType::AddUdpFrontend(RequestUdpFrontend {
3860 cluster_id: "udp_cluster".to_string(),
3861 address,
3862 tags: BTreeMap::from([("owner".to_string(), "team".to_string())]),
3863 })
3864 .into(),
3865 )
3866 .expect("could not add udp frontend");
3867
3868 assert_eq!(state.udp_listeners.len(), 1);
3869 assert!(state.udp_listeners[&address.into()].active);
3870 assert_eq!(
3871 state.udp_fronts.get("udp_cluster").map(Vec::len),
3872 Some(1usize)
3873 );
3874
3875 let logical = |s: &ConfigState| {
3880 let mut c = s.clone();
3881 c.request_counts.clear();
3882 c
3883 };
3884
3885 let mut replayed = ConfigState::default();
3887 for request in state.generate_requests() {
3888 replayed
3889 .dispatch(&request)
3890 .expect("could not replay generated request");
3891 }
3892 assert_eq!(
3893 logical(&state),
3894 logical(&replayed),
3895 "UDP listener + frontend must survive generate_requests → replay"
3896 );
3897
3898 let empty = ConfigState::default();
3900 let mut from_diff = ConfigState::default();
3901 for request in empty.diff(&state) {
3902 from_diff
3903 .dispatch(&request)
3904 .expect("could not replay diff request");
3905 }
3906 assert_eq!(
3907 logical(&state),
3908 logical(&from_diff),
3909 "diff(empty -> state) must reconstruct the UDP listener + frontend"
3910 );
3911
3912 let mut torn_down = state.clone();
3914 for request in state.diff(&empty) {
3915 torn_down
3916 .dispatch(&request)
3917 .expect("could not replay teardown diff request");
3918 }
3919 assert!(
3920 torn_down.udp_listeners.is_empty(),
3921 "diff(state -> empty) must remove the UDP listener"
3922 );
3923 assert!(
3924 torn_down
3925 .udp_fronts
3926 .get("udp_cluster")
3927 .map(Vec::is_empty)
3928 .unwrap_or(true),
3929 "diff(state -> empty) must remove the UDP frontend"
3930 );
3931
3932 state
3934 .dispatch(
3935 &RequestType::UpdateUdpListener(UpdateUdpListenerConfig {
3936 address,
3937 max_flows: Some(4096),
3938 front_timeout: Some(15),
3939 ..Default::default()
3940 })
3941 .into(),
3942 )
3943 .expect("could not update udp listener");
3944 let updated = &state.udp_listeners[&address.into()];
3945 assert_eq!(updated.max_flows, 4096);
3946 assert_eq!(updated.front_timeout, 15);
3947 assert_eq!(
3948 updated.back_timeout, 30,
3949 "unpatched field must be preserved"
3950 );
3951 }
3952
3953 #[test]
3957 fn list_frontends_includes_udp() {
3958 let tcp_addr = SocketAddress::new_v4(0, 0, 0, 0, 6379);
3959 let udp_addr = SocketAddress::new_v4(0, 0, 0, 0, 5353);
3960
3961 let mut state = ConfigState::default();
3962 state
3963 .dispatch(
3964 &RequestType::AddTcpFrontend(RequestTcpFrontend {
3965 cluster_id: "tcp_cluster".to_string(),
3966 address: tcp_addr,
3967 ..Default::default()
3968 })
3969 .into(),
3970 )
3971 .expect("could not add tcp frontend");
3972 state
3973 .dispatch(
3974 &RequestType::AddUdpFrontend(RequestUdpFrontend {
3975 cluster_id: "udp_cluster".to_string(),
3976 address: udp_addr,
3977 ..Default::default()
3978 })
3979 .into(),
3980 )
3981 .expect("could not add udp frontend");
3982
3983 let all = state.list_frontends(FrontendFilters::default());
3985 assert_eq!(all.tcp_frontends.len(), 1, "tcp frontend must be listed");
3986 assert_eq!(
3987 all.udp_frontends.len(),
3988 1,
3989 "udp frontend must be listed under the default all-pass path"
3990 );
3991 assert_eq!(all.udp_frontends[0].cluster_id, "udp_cluster");
3992 assert_eq!(all.udp_frontends[0].address, udp_addr);
3993
3994 let tcp_only = state.list_frontends(FrontendFilters {
3996 tcp: true,
3997 ..Default::default()
3998 });
3999 assert_eq!(tcp_only.tcp_frontends.len(), 1);
4000 assert_eq!(
4001 tcp_only.udp_frontends.len(),
4002 1,
4003 "udp frontends ride the tcp filter"
4004 );
4005
4006 let http_only = state.list_frontends(FrontendFilters {
4008 http: true,
4009 ..Default::default()
4010 });
4011 assert!(http_only.tcp_frontends.is_empty());
4012 assert!(http_only.udp_frontends.is_empty());
4013
4014 let domain_filtered = state.list_frontends(FrontendFilters {
4016 domain: Some("example.com".to_string()),
4017 ..Default::default()
4018 });
4019 assert!(domain_filtered.tcp_frontends.is_empty());
4020 assert!(domain_filtered.udp_frontends.is_empty());
4021 }
4022
4023 #[test]
4028 fn update_https_listener_happy_path_h2_knobs() {
4029 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4030 let mut state = ConfigState::new();
4031 state
4032 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4033 .unwrap();
4034
4035 let patch = UpdateHttpsListenerConfig {
4036 address: addr,
4037 h2_max_rst_stream_per_window: Some(50),
4038 h2_max_ping_per_window: Some(20),
4039 ..Default::default()
4040 };
4041 state
4042 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4043 .expect("update must succeed");
4044
4045 let listener = state
4046 .https_listeners
4047 .get(&SocketAddr::from(addr))
4048 .expect("listener must be present");
4049 assert_eq!(listener.h2_max_rst_stream_per_window, Some(50));
4050 assert_eq!(listener.h2_max_ping_per_window, Some(20));
4051 assert_eq!(listener.front_timeout, 60);
4053 assert_eq!(listener.h2_max_settings_per_window, None);
4054 }
4055
4056 #[test]
4059 fn update_https_listener_not_found() {
4060 let mut state = ConfigState::new();
4061 let patch = UpdateHttpsListenerConfig {
4062 address: SocketAddress::new_v4(1, 2, 3, 4, 9999),
4063 h2_max_rst_stream_per_window: Some(50),
4064 ..Default::default()
4065 };
4066 let err = state
4067 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4068 .unwrap_err();
4069 assert!(
4070 matches!(
4071 err,
4072 StateError::NotFound {
4073 kind: ObjectKind::HttpsListener,
4074 ..
4075 }
4076 ),
4077 "expected NotFound, got: {err}"
4078 );
4079 }
4080
4081 #[test]
4084 fn update_https_listener_noop_patch() {
4085 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4086 let mut state = ConfigState::new();
4087 let original = make_https_listener(addr);
4088 state
4089 .dispatch(&RequestType::AddHttpsListener(original.clone()).into())
4090 .unwrap();
4091
4092 let patch = UpdateHttpsListenerConfig {
4093 address: addr,
4094 ..Default::default()
4095 };
4096 state
4097 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4098 .expect("no-op patch must succeed");
4099
4100 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4101 assert_eq!(listener.front_timeout, original.front_timeout);
4102 assert_eq!(
4103 listener.h2_max_rst_stream_per_window,
4104 original.h2_max_rst_stream_per_window
4105 );
4106 }
4107
4108 #[test]
4110 fn update_https_listener_invalid_value_flood_knob_zero() {
4111 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4112 let mut state = ConfigState::new();
4113 state
4114 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4115 .unwrap();
4116
4117 let patch = UpdateHttpsListenerConfig {
4118 address: addr,
4119 h2_max_rst_stream_per_window: Some(0),
4120 ..Default::default()
4121 };
4122 let err = state
4123 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4124 .unwrap_err();
4125 assert!(
4126 matches!(
4127 err,
4128 StateError::InvalidValue {
4129 field: "h2_max_rst_stream_per_window",
4130 ..
4131 }
4132 ),
4133 "expected InvalidValue for flood knob 0, got: {err}"
4134 );
4135 }
4136
4137 #[test]
4143 fn add_cluster_invalid_health_check_uri_rejected() {
4144 use crate::proto::command::HealthCheckConfig;
4145
4146 let mut state = ConfigState::new();
4147 let err = state
4148 .dispatch(
4149 &RequestType::AddCluster(Cluster {
4150 cluster_id: String::from("evil_cluster"),
4151 health_check: Some(HealthCheckConfig {
4152 uri: String::from("/foo\r\nGET /admin"),
4153 interval: 5_000,
4154 timeout: 1_000,
4155 healthy_threshold: 2,
4156 unhealthy_threshold: 2,
4157 ..Default::default()
4158 }),
4159 ..Default::default()
4160 })
4161 .into(),
4162 )
4163 .unwrap_err();
4164
4165 assert!(
4166 matches!(
4167 err,
4168 StateError::InvalidValue {
4169 field: "health_check",
4170 ..
4171 }
4172 ),
4173 "expected InvalidValue for CRLF-bearing health-check URI, got: {err}"
4174 );
4175 assert!(
4176 !state.clusters.contains_key("evil_cluster"),
4177 "cluster must not be inserted when health_check fails validation",
4178 );
4179 }
4180
4181 #[test]
4183 fn update_https_listener_alpn_unknown_value_rejected() {
4184 use crate::proto::command::AlpnProtocols;
4185
4186 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4187 let mut state = ConfigState::new();
4188 state
4189 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4190 .unwrap();
4191
4192 let patch = UpdateHttpsListenerConfig {
4193 address: addr,
4194 alpn_protocols: Some(AlpnProtocols {
4195 values: vec!["h3".to_owned()],
4196 }),
4197 ..Default::default()
4198 };
4199 let err = state
4200 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4201 .unwrap_err();
4202 assert!(
4203 matches!(
4204 err,
4205 StateError::InvalidValue {
4206 field: "alpn_protocols",
4207 ..
4208 }
4209 ),
4210 "expected InvalidValue for unknown ALPN, got: {err}"
4211 );
4212 }
4213
4214 #[test]
4216 fn update_https_listener_alpn_empty_reset_accepted() {
4217 use crate::proto::command::AlpnProtocols;
4218
4219 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4220 let mut state = ConfigState::new();
4221 let mut listener = make_https_listener(addr);
4222 listener.alpn_protocols = vec!["h2".to_owned()];
4223 state
4224 .dispatch(&RequestType::AddHttpsListener(listener).into())
4225 .unwrap();
4226
4227 let patch = UpdateHttpsListenerConfig {
4228 address: addr,
4229 alpn_protocols: Some(AlpnProtocols { values: vec![] }),
4230 ..Default::default()
4231 };
4232 state
4233 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4234 .expect("empty ALPN reset must succeed");
4235
4236 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4237 assert!(
4238 listener.alpn_protocols.is_empty(),
4239 "ALPN must have been reset to empty"
4240 );
4241 }
4242
4243 #[test]
4245 fn update_https_listener_alpn_valid_values_accepted() {
4246 use crate::proto::command::AlpnProtocols;
4247
4248 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4249 let mut state = ConfigState::new();
4250 state
4251 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4252 .unwrap();
4253
4254 let patch = UpdateHttpsListenerConfig {
4255 address: addr,
4256 alpn_protocols: Some(AlpnProtocols {
4257 values: vec!["h2".to_owned(), "http/1.1".to_owned()],
4258 }),
4259 ..Default::default()
4260 };
4261 state
4262 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4263 .expect("valid ALPN must be accepted");
4264
4265 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4266 assert_eq!(listener.alpn_protocols, vec!["h2", "http/1.1"]);
4267 }
4268
4269 #[test]
4272 fn update_https_listener_alpn_absent_preserves_existing() {
4273 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4274 let mut state = ConfigState::new();
4275 let mut listener = make_https_listener(addr);
4276 listener.alpn_protocols = vec!["h2".to_owned()];
4277 state
4278 .dispatch(&RequestType::AddHttpsListener(listener).into())
4279 .unwrap();
4280
4281 let patch = UpdateHttpsListenerConfig {
4283 address: addr,
4284 front_timeout: Some(10),
4285 ..Default::default()
4286 };
4287 state
4288 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4289 .unwrap();
4290
4291 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4292 assert_eq!(
4293 listener.alpn_protocols,
4294 vec!["h2"],
4295 "ALPN must be preserved when not patched"
4296 );
4297 }
4298
4299 #[test]
4301 fn update_https_listener_sozu_id_header_empty_rejected() {
4302 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4303 let mut state = ConfigState::new();
4304 state
4305 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4306 .unwrap();
4307
4308 let patch = UpdateHttpsListenerConfig {
4309 address: addr,
4310 sozu_id_header: Some(String::new()),
4311 ..Default::default()
4312 };
4313 let err = state
4314 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4315 .unwrap_err();
4316 assert!(
4317 matches!(
4318 err,
4319 StateError::InvalidValue {
4320 field: "sozu_id_header",
4321 ..
4322 }
4323 ),
4324 "expected InvalidValue for empty header name, got: {err}"
4325 );
4326 }
4327
4328 #[test]
4330 fn update_https_listener_sozu_id_header_colon_rejected() {
4331 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4332 let mut state = ConfigState::new();
4333 state
4334 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4335 .unwrap();
4336
4337 let patch = UpdateHttpsListenerConfig {
4338 address: addr,
4339 sozu_id_header: Some("bad: value".to_owned()),
4340 ..Default::default()
4341 };
4342 let err = state
4343 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4344 .unwrap_err();
4345 assert!(
4346 matches!(
4347 err,
4348 StateError::InvalidValue {
4349 field: "sozu_id_header",
4350 ..
4351 }
4352 ),
4353 "expected InvalidValue for header name with colon, got: {err}"
4354 );
4355 }
4356
4357 #[test]
4359 fn update_https_listener_sozu_id_header_valid_accepted() {
4360 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4361 let mut state = ConfigState::new();
4362 state
4363 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4364 .unwrap();
4365
4366 let patch = UpdateHttpsListenerConfig {
4367 address: addr,
4368 sozu_id_header: Some("X-Edge-Id".to_owned()),
4369 ..Default::default()
4370 };
4371 state
4372 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4373 .expect("valid header name must be accepted");
4374
4375 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4376 assert_eq!(listener.sozu_id_header.as_deref(), Some("X-Edge-Id"));
4377 }
4378
4379 #[test]
4381 fn update_https_listener_graceful_shutdown_zero_allowed() {
4382 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
4383 let mut state = ConfigState::new();
4384 state
4385 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
4386 .unwrap();
4387
4388 let patch = UpdateHttpsListenerConfig {
4389 address: addr,
4390 h2_graceful_shutdown_deadline_seconds: Some(0),
4391 ..Default::default()
4392 };
4393 state
4394 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
4395 .expect("graceful_shutdown_deadline=0 must be allowed");
4396
4397 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
4398 assert_eq!(listener.h2_graceful_shutdown_deadline_seconds, Some(0));
4399 }
4400
4401 #[test]
4405 fn update_http_listener_happy_path() {
4406 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
4407 let mut state = ConfigState::new();
4408 state
4409 .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
4410 .unwrap();
4411
4412 let patch = UpdateHttpListenerConfig {
4413 address: addr,
4414 front_timeout: Some(15),
4415 h2_max_rst_stream_per_window: Some(25),
4416 ..Default::default()
4417 };
4418 state
4419 .dispatch(&RequestType::UpdateHttpListener(patch).into())
4420 .expect("HTTP update must succeed");
4421
4422 let listener = state.http_listeners.get(&SocketAddr::from(addr)).unwrap();
4423 assert_eq!(listener.front_timeout, 15);
4424 assert_eq!(listener.h2_max_rst_stream_per_window, Some(25));
4425 assert_eq!(listener.back_timeout, 30);
4427 }
4428
4429 #[test]
4431 fn update_http_listener_flood_knob_zero_rejected() {
4432 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
4433 let mut state = ConfigState::new();
4434 state
4435 .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
4436 .unwrap();
4437
4438 let patch = UpdateHttpListenerConfig {
4439 address: addr,
4440 h2_max_window_update_stream0_per_window: Some(0),
4441 ..Default::default()
4442 };
4443 let err = state
4444 .dispatch(&RequestType::UpdateHttpListener(patch).into())
4445 .unwrap_err();
4446 assert!(
4447 matches!(
4448 err,
4449 StateError::InvalidValue {
4450 field: "h2_max_window_update_stream0_per_window",
4451 ..
4452 }
4453 ),
4454 "expected InvalidValue, got: {err}"
4455 );
4456 }
4457
4458 #[test]
4462 fn update_tcp_listener_happy_path() {
4463 let addr = SocketAddress::new_v4(0, 0, 0, 0, 9000);
4464 let mut state = ConfigState::new();
4465 state
4466 .dispatch(&RequestType::AddTcpListener(make_tcp_listener(addr)).into())
4467 .unwrap();
4468
4469 let patch = UpdateTcpListenerConfig {
4470 address: addr,
4471 front_timeout: Some(5),
4472 ..Default::default()
4473 };
4474 state
4475 .dispatch(&RequestType::UpdateTcpListener(patch).into())
4476 .expect("TCP update must succeed");
4477
4478 let listener = state.tcp_listeners.get(&SocketAddr::from(addr)).unwrap();
4479 assert_eq!(listener.front_timeout, 5);
4480 assert_eq!(listener.back_timeout, 30); }
4482
4483 #[test]
4485 fn update_tcp_listener_not_found() {
4486 let mut state = ConfigState::new();
4487 let patch = UpdateTcpListenerConfig {
4488 address: SocketAddress::new_v4(9, 9, 9, 9, 9999),
4489 front_timeout: Some(5),
4490 ..Default::default()
4491 };
4492 let err = state
4493 .dispatch(&RequestType::UpdateTcpListener(patch).into())
4494 .unwrap_err();
4495 assert!(
4496 matches!(
4497 err,
4498 StateError::NotFound {
4499 kind: ObjectKind::TcpListener,
4500 ..
4501 }
4502 ),
4503 "expected NotFound, got: {err}"
4504 );
4505 }
4506
4507 #[test]
4513 fn dispatch_passes_through_set_metric_detail() {
4514 use crate::proto::command::{MetricDetail, SetMetricDetail};
4515 let mut state = ConfigState::new();
4516 let req: Request = RequestType::SetMetricDetail(SetMetricDetail {
4517 client_id: "test:1".to_owned(),
4518 detail: Some(MetricDetail::DetailBackend as i32),
4519 ttl_seconds: Some(60),
4520 clear: Some(false),
4521 reason: Some("regression-guard".to_owned()),
4522 peer_pid: None,
4523 peer_session_ulid: None,
4524 })
4525 .into();
4526 state
4527 .dispatch(&req)
4528 .expect("SetMetricDetail must traverse dispatch without UndispatchableRequest");
4529 }
4530}