1use std::{
2 collections::{
3 BTreeMap, BTreeSet, HashMap, HashSet, btree_map::Entry as BTreeMapEntry,
4 hash_map::DefaultHasher,
5 },
6 fs::File,
7 hash::{Hash, Hasher},
8 io::Write,
9 iter::repeat,
10 net::SocketAddr,
11};
12
13use prost::{Message, UnknownEnumValue};
14
15use crate::{
16 ObjectKind,
17 certificate::{CertificateError, Fingerprint, calculate_fingerprint},
18 proto::{
19 command::{
20 ActivateListener, AddBackend, AddCertificate, CertificateAndKey, Cluster,
21 ClusterInformation, CustomHttpAnswers, DeactivateListener, FrontendFilters,
22 HealthChecksList, HttpListenerConfig, HttpsListenerConfig, InitialState,
23 ListedFrontends, ListenerType, ListenersList, PathRule, QueryCertificatesFilters,
24 RemoveBackend, RemoveCertificate, RemoveListener, ReplaceCertificate, Request,
25 RequestCounts, RequestHttpFrontend, RequestTcpFrontend, SetHealthCheck, SocketAddress,
26 TcpListenerConfig, UpdateHttpListenerConfig, UpdateHttpsListenerConfig,
27 UpdateTcpListenerConfig, WorkerRequest, request::RequestType,
28 },
29 display::format_request_type,
30 },
31 response::{Backend, HttpFrontend, TcpFrontend},
32};
33
34pub type ClusterId = String;
36
37#[derive(thiserror::Error, Debug)]
38pub enum StateError {
39 #[error("Request came in empty")]
40 EmptyRequest,
41 #[error("dispatching this request did not bring any change to the state")]
42 NoChange,
43 #[error("State can not handle this request")]
44 UndispatchableRequest,
45 #[error("Did not find {kind:?} with address or id '{id}'")]
46 NotFound { kind: ObjectKind, id: String },
47 #[error("{kind:?} '{id}' already exists")]
48 Exists { kind: ObjectKind, id: String },
49 #[error("Wrong field value: {0}")]
50 WrongFieldValue(UnknownEnumValue),
51 #[error("Could not add certificate: {0}")]
52 AddCertificate(CertificateError),
53 #[error("Could not remove certificate: {0}")]
54 RemoveCertificate(String),
55 #[error("Could not replace certificate: {0}")]
56 ReplaceCertificate(String),
57 #[error(
58 "Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
59 )]
60 FrontendConversion { frontend: String, error: String },
61 #[error("Could not write state to file: {0}")]
62 FileError(std::io::Error),
63 #[error("Invalid value for field '{field}': {reason}")]
64 InvalidValue {
65 field: &'static str,
66 reason: &'static str,
67 },
68}
69
70#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
79pub struct ConfigState {
80 pub clusters: BTreeMap<ClusterId, Cluster>,
81 pub backends: BTreeMap<ClusterId, Vec<Backend>>,
82 pub http_listeners: BTreeMap<SocketAddr, HttpListenerConfig>,
84 pub https_listeners: BTreeMap<SocketAddr, HttpsListenerConfig>,
86 pub tcp_listeners: BTreeMap<SocketAddr, TcpListenerConfig>,
88 pub http_fronts: BTreeMap<String, HttpFrontend>,
91 pub https_fronts: BTreeMap<String, HttpFrontend>,
93 pub tcp_fronts: HashMap<ClusterId, Vec<TcpFrontend>>,
94 pub certificates: HashMap<SocketAddr, HashMap<Fingerprint, CertificateAndKey>>,
95 pub request_counts: BTreeMap<String, i32>,
97}
98
99impl ConfigState {
100 pub fn new() -> Self {
101 Self::default()
102 }
103
104 pub fn dispatch(&mut self, request: &Request) -> Result<(), StateError> {
105 let request_type = match &request.request_type {
106 Some(t) => t,
107 None => return Err(StateError::EmptyRequest),
108 };
109
110 self.increment_request_count(request);
111
112 match request_type {
113 RequestType::AddCluster(cluster) => self.add_cluster(cluster),
114 RequestType::RemoveCluster(cluster_id) => self.remove_cluster(cluster_id),
115 RequestType::AddHttpListener(listener) => self.add_http_listener(listener),
116 RequestType::AddHttpsListener(listener) => self.add_https_listener(listener),
117 RequestType::AddTcpListener(listener) => self.add_tcp_listener(listener),
118 RequestType::RemoveListener(remove) => self.remove_listener(remove),
119 RequestType::ActivateListener(activate) => self.activate_listener(activate),
120 RequestType::DeactivateListener(deactivate) => self.deactivate_listener(deactivate),
121 RequestType::AddHttpFrontend(front) => self.add_http_frontend(front),
122 RequestType::RemoveHttpFrontend(front) => self.remove_http_frontend(front),
123 RequestType::AddCertificate(add) => self.add_certificate(add),
124 RequestType::RemoveCertificate(remove) => self.remove_certificate(remove),
125 RequestType::ReplaceCertificate(replace) => self.replace_certificate(replace),
126 RequestType::AddHttpsFrontend(front) => self.add_https_frontend(front),
127 RequestType::RemoveHttpsFrontend(front) => self.remove_https_frontend(front),
128 RequestType::AddTcpFrontend(front) => self.add_tcp_frontend(front),
129 RequestType::RemoveTcpFrontend(front) => self.remove_tcp_frontend(front),
130 RequestType::AddBackend(add_backend) => self.add_backend(add_backend),
131 RequestType::RemoveBackend(backend) => self.remove_backend(backend),
132 RequestType::UpdateHttpListener(patch) => self.update_http_listener(patch),
133 RequestType::UpdateHttpsListener(patch) => self.update_https_listener(patch),
134 RequestType::UpdateTcpListener(patch) => self.update_tcp_listener(patch),
135 RequestType::SetHealthCheck(set) => self.set_health_check(set),
136 RequestType::RemoveHealthCheck(cluster_id) => self.remove_health_check(cluster_id),
137
138 RequestType::Logging(_)
145 | RequestType::CountRequests(_)
146 | RequestType::Status(_)
147 | RequestType::SoftStop(_)
148 | RequestType::QueryCertificatesFromWorkers(_)
149 | RequestType::QueryClusterById(_)
150 | RequestType::QueryClustersByDomain(_)
151 | RequestType::QueryMetrics(_)
152 | RequestType::QueryClustersHashes(_)
153 | RequestType::ConfigureMetrics(_)
154 | RequestType::SetMetricDetail(_)
155 | RequestType::ReturnListenSockets(_)
156 | RequestType::SetMaxConnectionsPerIp(_)
157 | RequestType::QueryMaxConnectionsPerIp(_)
158 | RequestType::HardStop(_) => Ok(()),
159
160 _other_request => Err(StateError::UndispatchableRequest),
161 }
162 }
163
164 fn increment_request_count(&mut self, request: &Request) {
166 if let Some(request_type) = &request.request_type {
167 let count = self
168 .request_counts
169 .entry(format_request_type(request_type).to_owned())
170 .or_insert(1);
171 *count += 1;
172 }
173 }
174
175 pub fn get_request_counts(&self) -> RequestCounts {
176 RequestCounts {
177 map: self.request_counts.clone(),
178 }
179 }
180
181 fn add_cluster(&mut self, cluster: &Cluster) -> Result<(), StateError> {
182 if let Some(hc) = cluster.health_check.as_ref() {
189 if let Err(reason) = crate::config::validate_health_check_config(hc) {
190 return Err(StateError::InvalidValue {
191 field: "health_check",
192 reason,
193 });
194 }
195 }
196 let cluster = cluster.clone();
197 self.clusters.insert(cluster.cluster_id.clone(), cluster);
198 Ok(())
199 }
200
201 fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), StateError> {
202 match self.clusters.remove(cluster_id) {
203 Some(_) => Ok(()),
204 None => Err(StateError::NotFound {
205 kind: ObjectKind::Cluster,
206 id: cluster_id.to_owned(),
207 }),
208 }
209 }
210
211 fn set_health_check(&mut self, set: &SetHealthCheck) -> Result<(), StateError> {
212 if let Err(reason) = crate::config::validate_health_check_config(&set.config) {
219 return Err(StateError::InvalidValue {
220 field: "health_check",
221 reason,
222 });
223 }
224 match self.clusters.get_mut(&set.cluster_id) {
225 Some(cluster) => {
226 cluster.health_check = Some(set.config.to_owned());
227 Ok(())
228 }
229 None => Err(StateError::NotFound {
230 kind: ObjectKind::Cluster,
231 id: set.cluster_id.to_owned(),
232 }),
233 }
234 }
235
236 fn remove_health_check(&mut self, cluster_id: &str) -> Result<(), StateError> {
237 match self.clusters.get_mut(cluster_id) {
238 Some(cluster) => {
239 cluster.health_check = None;
240 Ok(())
241 }
242 None => Err(StateError::NotFound {
243 kind: ObjectKind::Cluster,
244 id: cluster_id.to_owned(),
245 }),
246 }
247 }
248
249 pub fn list_health_checks(&self, cluster_id: Option<&str>) -> HealthChecksList {
250 let map = self
251 .clusters
252 .iter()
253 .filter(|(id, _)| cluster_id.is_none_or(|filter| filter == id.as_str()))
254 .filter_map(|(id, cluster)| {
255 cluster
256 .health_check
257 .as_ref()
258 .map(|hc| (id.to_owned(), hc.to_owned()))
259 })
260 .collect();
261 HealthChecksList { map }
262 }
263
264 fn add_http_listener(&mut self, listener: &HttpListenerConfig) -> Result<(), StateError> {
265 let address: SocketAddr = listener.address.into();
266 match self.http_listeners.entry(address) {
267 BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
268 BTreeMapEntry::Occupied(_) => {
269 return Err(StateError::Exists {
270 kind: ObjectKind::HttpListener,
271 id: address.to_string(),
272 });
273 }
274 };
275 Ok(())
276 }
277
278 fn add_https_listener(&mut self, listener: &HttpsListenerConfig) -> Result<(), StateError> {
279 let address: SocketAddr = listener.address.into();
280 match self.https_listeners.entry(address) {
281 BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
282 BTreeMapEntry::Occupied(_) => {
283 return Err(StateError::Exists {
284 kind: ObjectKind::HttpsListener,
285 id: address.to_string(),
286 });
287 }
288 };
289 Ok(())
290 }
291
292 fn add_tcp_listener(&mut self, listener: &TcpListenerConfig) -> Result<(), StateError> {
293 let address: SocketAddr = listener.address.into();
294 match self.tcp_listeners.entry(address) {
295 BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
296 BTreeMapEntry::Occupied(_) => {
297 return Err(StateError::Exists {
298 kind: ObjectKind::TcpListener,
299 id: address.to_string(),
300 });
301 }
302 };
303 Ok(())
304 }
305
306 fn remove_listener(&mut self, remove: &RemoveListener) -> Result<(), StateError> {
307 match ListenerType::try_from(remove.proxy).map_err(StateError::WrongFieldValue)? {
308 ListenerType::Http => self.remove_http_listener(&remove.address.into()),
309 ListenerType::Https => self.remove_https_listener(&remove.address.into()),
310 ListenerType::Tcp => self.remove_tcp_listener(&remove.address.into()),
311 }
312 }
313
314 fn remove_http_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
315 if self.http_listeners.remove(address).is_none() {
316 return Err(StateError::NoChange);
317 }
318 Ok(())
319 }
320
321 fn remove_https_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
322 if self.https_listeners.remove(address).is_none() {
323 return Err(StateError::NoChange);
324 }
325 Ok(())
326 }
327
328 fn remove_tcp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
329 if self.tcp_listeners.remove(address).is_none() {
330 return Err(StateError::NoChange);
331 }
332 Ok(())
333 }
334
335 fn update_http_listener(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
342 validate_h2_flood_knobs_http(patch)?;
343
344 let address: SocketAddr = patch.address.into();
345 let listener =
346 self.http_listeners
347 .get_mut(&address)
348 .ok_or_else(|| StateError::NotFound {
349 kind: ObjectKind::HttpListener,
350 id: address.to_string(),
351 })?;
352
353 if let Some(v) = patch.public_address {
355 listener.public_address = Some(v);
356 }
357 if let Some(v) = patch.expect_proxy {
358 listener.expect_proxy = v;
359 }
360 if let Some(ref v) = patch.sticky_name {
361 listener.sticky_name = v.to_owned();
362 }
363 if let Some(v) = patch.front_timeout {
364 listener.front_timeout = v;
365 }
366 if let Some(v) = patch.back_timeout {
367 listener.back_timeout = v;
368 }
369 if let Some(v) = patch.connect_timeout {
370 listener.connect_timeout = v;
371 }
372 if let Some(v) = patch.request_timeout {
373 listener.request_timeout = v;
374 }
375 if let Some(patch_answers) = patch.http_answers.as_ref() {
376 merge_custom_http_answers(&mut listener.http_answers, patch_answers);
377 }
378 if let Some(v) = patch.h2_max_rst_stream_per_window {
380 listener.h2_max_rst_stream_per_window = Some(v);
381 }
382 if let Some(v) = patch.h2_max_ping_per_window {
383 listener.h2_max_ping_per_window = Some(v);
384 }
385 if let Some(v) = patch.h2_max_settings_per_window {
386 listener.h2_max_settings_per_window = Some(v);
387 }
388 if let Some(v) = patch.h2_max_empty_data_per_window {
389 listener.h2_max_empty_data_per_window = Some(v);
390 }
391 if let Some(v) = patch.h2_max_continuation_frames {
392 listener.h2_max_continuation_frames = Some(v);
393 }
394 if let Some(v) = patch.h2_max_glitch_count {
395 listener.h2_max_glitch_count = Some(v);
396 }
397 if let Some(v) = patch.h2_initial_connection_window {
398 listener.h2_initial_connection_window = Some(v);
399 }
400 if let Some(v) = patch.h2_max_concurrent_streams {
401 listener.h2_max_concurrent_streams = Some(v);
402 }
403 if let Some(v) = patch.h2_stream_shrink_ratio {
404 listener.h2_stream_shrink_ratio = Some(v);
405 }
406 if let Some(v) = patch.h2_max_rst_stream_lifetime {
407 listener.h2_max_rst_stream_lifetime = Some(v);
408 }
409 if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
410 listener.h2_max_rst_stream_abusive_lifetime = Some(v);
411 }
412 if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
413 listener.h2_max_rst_stream_emitted_lifetime = Some(v);
414 }
415 if let Some(v) = patch.h2_max_header_list_size {
416 listener.h2_max_header_list_size = Some(v);
417 }
418 if let Some(v) = patch.h2_max_header_table_size {
419 listener.h2_max_header_table_size = Some(v);
420 }
421 if let Some(v) = patch.h2_stream_idle_timeout_seconds {
422 listener.h2_stream_idle_timeout_seconds = Some(v);
423 }
424 if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
426 listener.h2_graceful_shutdown_deadline_seconds = Some(v);
427 }
428 if let Some(v) = patch.h2_max_window_update_stream0_per_window {
429 listener.h2_max_window_update_stream0_per_window = Some(v);
430 }
431 if let Some(ref v) = patch.sozu_id_header {
432 validate_sozu_id_header(v)?;
433 listener.sozu_id_header = Some(v.to_owned());
434 }
435 Ok(())
436 }
437
438 fn update_https_listener(
445 &mut self,
446 patch: &UpdateHttpsListenerConfig,
447 ) -> Result<(), StateError> {
448 validate_h2_flood_knobs_https(patch)?;
449
450 let address: SocketAddr = patch.address.into();
451 let listener =
452 self.https_listeners
453 .get_mut(&address)
454 .ok_or_else(|| StateError::NotFound {
455 kind: ObjectKind::HttpsListener,
456 id: address.to_string(),
457 })?;
458
459 if let Some(v) = patch.public_address {
461 listener.public_address = Some(v);
462 }
463 if let Some(v) = patch.expect_proxy {
464 listener.expect_proxy = v;
465 }
466 if let Some(ref v) = patch.sticky_name {
467 listener.sticky_name = v.to_owned();
468 }
469 if let Some(v) = patch.front_timeout {
470 listener.front_timeout = v;
471 }
472 if let Some(v) = patch.back_timeout {
473 listener.back_timeout = v;
474 }
475 if let Some(v) = patch.connect_timeout {
476 listener.connect_timeout = v;
477 }
478 if let Some(v) = patch.request_timeout {
479 listener.request_timeout = v;
480 }
481 if let Some(patch_answers) = patch.http_answers.as_ref() {
482 merge_custom_http_answers(&mut listener.http_answers, patch_answers);
483 }
484 if let Some(ref alpn_wrapper) = patch.alpn_protocols {
486 validate_alpn_protocols(&alpn_wrapper.values)?;
487 listener.alpn_protocols = alpn_wrapper.values.clone();
489 }
490 if let Some(v) = patch.strict_sni_binding {
491 listener.strict_sni_binding = Some(v);
492 }
493 if let Some(v) = patch.disable_http11 {
494 listener.disable_http11 = Some(v);
495 }
496 if let Some(v) = patch.h2_max_rst_stream_per_window {
498 listener.h2_max_rst_stream_per_window = Some(v);
499 }
500 if let Some(v) = patch.h2_max_ping_per_window {
501 listener.h2_max_ping_per_window = Some(v);
502 }
503 if let Some(v) = patch.h2_max_settings_per_window {
504 listener.h2_max_settings_per_window = Some(v);
505 }
506 if let Some(v) = patch.h2_max_empty_data_per_window {
507 listener.h2_max_empty_data_per_window = Some(v);
508 }
509 if let Some(v) = patch.h2_max_continuation_frames {
510 listener.h2_max_continuation_frames = Some(v);
511 }
512 if let Some(v) = patch.h2_max_glitch_count {
513 listener.h2_max_glitch_count = Some(v);
514 }
515 if let Some(v) = patch.h2_initial_connection_window {
516 listener.h2_initial_connection_window = Some(v);
517 }
518 if let Some(v) = patch.h2_max_concurrent_streams {
519 listener.h2_max_concurrent_streams = Some(v);
520 }
521 if let Some(v) = patch.h2_stream_shrink_ratio {
522 listener.h2_stream_shrink_ratio = Some(v);
523 }
524 if let Some(v) = patch.h2_max_rst_stream_lifetime {
525 listener.h2_max_rst_stream_lifetime = Some(v);
526 }
527 if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
528 listener.h2_max_rst_stream_abusive_lifetime = Some(v);
529 }
530 if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
531 listener.h2_max_rst_stream_emitted_lifetime = Some(v);
532 }
533 if let Some(v) = patch.h2_max_header_list_size {
534 listener.h2_max_header_list_size = Some(v);
535 }
536 if let Some(v) = patch.h2_max_header_table_size {
537 listener.h2_max_header_table_size = Some(v);
538 }
539 if let Some(v) = patch.h2_stream_idle_timeout_seconds {
540 listener.h2_stream_idle_timeout_seconds = Some(v);
541 }
542 if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
544 listener.h2_graceful_shutdown_deadline_seconds = Some(v);
545 }
546 if let Some(v) = patch.h2_max_window_update_stream0_per_window {
547 listener.h2_max_window_update_stream0_per_window = Some(v);
548 }
549 if let Some(ref v) = patch.sozu_id_header {
550 validate_sozu_id_header(v)?;
551 listener.sozu_id_header = Some(v.to_owned());
552 }
553 Ok(())
554 }
555
556 fn update_tcp_listener(&mut self, patch: &UpdateTcpListenerConfig) -> Result<(), StateError> {
561 let address: SocketAddr = patch.address.into();
562 let listener =
563 self.tcp_listeners
564 .get_mut(&address)
565 .ok_or_else(|| StateError::NotFound {
566 kind: ObjectKind::TcpListener,
567 id: address.to_string(),
568 })?;
569
570 if let Some(v) = patch.public_address {
571 listener.public_address = Some(v);
572 }
573 if let Some(v) = patch.expect_proxy {
574 listener.expect_proxy = v;
575 }
576 if let Some(v) = patch.front_timeout {
577 listener.front_timeout = v;
578 }
579 if let Some(v) = patch.back_timeout {
580 listener.back_timeout = v;
581 }
582 if let Some(v) = patch.connect_timeout {
583 listener.connect_timeout = v;
584 }
585 Ok(())
586 }
587
588 fn activate_listener(&mut self, activate: &ActivateListener) -> Result<(), StateError> {
589 match ListenerType::try_from(activate.proxy).map_err(StateError::WrongFieldValue)? {
590 ListenerType::Http => self
591 .http_listeners
592 .get_mut(&activate.address.into())
593 .map(|listener| listener.active = true)
594 .ok_or(StateError::NotFound {
595 kind: ObjectKind::HttpListener,
596 id: activate.address.to_string(),
597 }),
598 ListenerType::Https => self
599 .https_listeners
600 .get_mut(&activate.address.into())
601 .map(|listener| listener.active = true)
602 .ok_or(StateError::NotFound {
603 kind: ObjectKind::HttpsListener,
604 id: activate.address.to_string(),
605 }),
606 ListenerType::Tcp => self
607 .tcp_listeners
608 .get_mut(&activate.address.into())
609 .map(|listener| listener.active = true)
610 .ok_or(StateError::NotFound {
611 kind: ObjectKind::TcpListener,
612 id: activate.address.to_string(),
613 }),
614 }
615 }
616
617 fn deactivate_listener(&mut self, deactivate: &DeactivateListener) -> Result<(), StateError> {
618 match ListenerType::try_from(deactivate.proxy).map_err(StateError::WrongFieldValue)? {
619 ListenerType::Http => self
620 .http_listeners
621 .get_mut(&deactivate.address.into())
622 .map(|listener| listener.active = false)
623 .ok_or(StateError::NotFound {
624 kind: ObjectKind::HttpListener,
625 id: deactivate.address.to_string(),
626 }),
627 ListenerType::Https => self
628 .https_listeners
629 .get_mut(&deactivate.address.into())
630 .map(|listener| listener.active = false)
631 .ok_or(StateError::NotFound {
632 kind: ObjectKind::HttpsListener,
633 id: deactivate.address.to_string(),
634 }),
635 ListenerType::Tcp => self
636 .tcp_listeners
637 .get_mut(&deactivate.address.into())
638 .map(|listener| listener.active = false)
639 .ok_or(StateError::NotFound {
640 kind: ObjectKind::TcpListener,
641 id: deactivate.address.to_string(),
642 }),
643 }
644 }
645
646 fn add_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
647 let front_as_key = front.to_string();
648
649 match self.http_fronts.entry(front.to_string()) {
650 BTreeMapEntry::Vacant(e) => {
651 e.insert(front.clone().to_frontend().map_err(|into_error| {
652 StateError::FrontendConversion {
653 frontend: front_as_key,
654 error: into_error.to_string(),
655 }
656 })?)
657 }
658 BTreeMapEntry::Occupied(_) => {
659 return Err(StateError::Exists {
660 kind: ObjectKind::HttpFrontend,
661 id: front.to_string(),
662 });
663 }
664 };
665 Ok(())
666 }
667
668 fn add_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
669 let front_as_key = front.to_string();
670
671 match self.https_fronts.entry(front.to_string()) {
672 BTreeMapEntry::Vacant(e) => {
673 e.insert(front.clone().to_frontend().map_err(|into_error| {
674 StateError::FrontendConversion {
675 frontend: front_as_key,
676 error: into_error.to_string(),
677 }
678 })?)
679 }
680 BTreeMapEntry::Occupied(_) => {
681 return Err(StateError::Exists {
682 kind: ObjectKind::HttpsFrontend,
683 id: front.to_string(),
684 });
685 }
686 };
687 Ok(())
688 }
689
690 fn remove_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
691 self.http_fronts
692 .remove(&front.to_string())
693 .ok_or(StateError::NotFound {
694 kind: ObjectKind::HttpFrontend,
695 id: front.to_string(),
696 })?;
697 Ok(())
698 }
699
700 fn remove_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
701 self.https_fronts
702 .remove(&front.to_string())
703 .ok_or(StateError::NotFound {
704 kind: ObjectKind::HttpsFrontend,
705 id: front.to_string(),
706 })?;
707 Ok(())
708 }
709
710 fn add_certificate(&mut self, add: &AddCertificate) -> Result<(), StateError> {
711 let fingerprint = add
712 .certificate
713 .fingerprint()
714 .map_err(StateError::AddCertificate)?;
715
716 let entry = self.certificates.entry(add.address.into()).or_default();
717
718 let mut add = add.clone();
719 add.certificate
720 .apply_overriding_names()
721 .map_err(StateError::AddCertificate)?;
722
723 if entry.contains_key(&fingerprint) {
724 info!(
725 "Skip loading of certificate '{}' for domain '{}' on listener '{}', the certificate is already present.",
726 fingerprint,
727 add.certificate.names.join(", "),
728 add.address
729 );
730 return Ok(());
731 }
732
733 entry.insert(fingerprint, add.certificate);
734 Ok(())
735 }
736
737 fn remove_certificate(&mut self, remove: &RemoveCertificate) -> Result<(), StateError> {
738 let fingerprint = Fingerprint(
739 hex::decode(&remove.fingerprint)
740 .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
741 );
742
743 if let Some(index) = self.certificates.get_mut(&remove.address.into()) {
744 index.remove(&fingerprint);
745 }
746
747 Ok(())
748 }
749
750 fn replace_certificate(&mut self, replace: &ReplaceCertificate) -> Result<(), StateError> {
755 let replace_address = replace.address.into();
756 let old_fingerprint = Fingerprint(
757 hex::decode(&replace.old_fingerprint)
758 .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
759 );
760
761 self.certificates
762 .get_mut(&replace_address)
763 .ok_or(StateError::NotFound {
764 kind: ObjectKind::Certificate,
765 id: replace.address.to_string(),
766 })?
767 .remove(&old_fingerprint);
768
769 let new_fingerprint = Fingerprint(
770 calculate_fingerprint(replace.new_certificate.certificate.as_bytes()).map_err(
771 |fingerprint_err| StateError::ReplaceCertificate(fingerprint_err.to_string()),
772 )?,
773 );
774
775 self.certificates
776 .get_mut(&replace_address)
777 .map(|certs| certs.insert(new_fingerprint.clone(), replace.new_certificate.clone()));
778
779 if !self
780 .certificates
781 .get(&replace_address)
782 .ok_or(StateError::ReplaceCertificate(
783 "Unlikely error. This entry in the certificate hashmap should be present"
784 .to_string(),
785 ))?
786 .contains_key(&new_fingerprint)
787 {
788 return Err(StateError::ReplaceCertificate(format!(
789 "Failed to insert the new certificate for address {}",
790 replace.address
791 )));
792 }
793 Ok(())
794 }
795
796 fn add_tcp_frontend(&mut self, front: &RequestTcpFrontend) -> Result<(), StateError> {
797 let tcp_frontends = self.tcp_fronts.entry(front.cluster_id.clone()).or_default();
798
799 let tcp_frontend = TcpFrontend {
800 cluster_id: front.cluster_id.clone(),
801 address: front.address.into(),
802 tags: front.tags.clone(),
803 };
804 if tcp_frontends.contains(&tcp_frontend) {
805 return Err(StateError::Exists {
806 kind: ObjectKind::TcpFrontend,
807 id: format!("{tcp_frontend:?}"),
808 });
809 }
810
811 tcp_frontends.push(tcp_frontend);
812 Ok(())
813 }
814
815 fn remove_tcp_frontend(
816 &mut self,
817 front_to_remove: &RequestTcpFrontend,
818 ) -> Result<(), StateError> {
819 let tcp_frontends =
820 self.tcp_fronts
821 .get_mut(&front_to_remove.cluster_id)
822 .ok_or(StateError::NotFound {
823 kind: ObjectKind::TcpFrontend,
824 id: format!("{front_to_remove:?}"),
825 })?;
826
827 let len = tcp_frontends.len();
828 tcp_frontends.retain(|front| front.address != front_to_remove.address.into());
829 if tcp_frontends.len() == len {
830 return Err(StateError::NoChange);
831 }
832 Ok(())
833 }
834
835 fn add_backend(&mut self, add_backend: &AddBackend) -> Result<(), StateError> {
836 let backend = Backend {
837 address: add_backend.address.into(),
838 cluster_id: add_backend.cluster_id.clone(),
839 backend_id: add_backend.backend_id.clone(),
840 sticky_id: add_backend.sticky_id.clone(),
841 load_balancing_parameters: add_backend.load_balancing_parameters,
842 backup: add_backend.backup,
843 };
844 let backends = self.backends.entry(backend.cluster_id.clone()).or_default();
845
846 backends.retain(|b| b.backend_id != backend.backend_id || b.address != backend.address);
848 backends.push(backend);
849 backends.sort();
850
851 Ok(())
852 }
853
854 fn remove_backend(&mut self, backend: &RemoveBackend) -> Result<(), StateError> {
855 let backend_list =
856 self.backends
857 .get_mut(&backend.cluster_id)
858 .ok_or(StateError::NotFound {
859 kind: ObjectKind::Backend,
860 id: backend.backend_id.to_owned(),
861 })?;
862
863 let len = backend_list.len();
864 let remove_address = backend.address.into();
865 backend_list.retain(|b| b.backend_id != backend.backend_id || b.address != remove_address);
866 backend_list.sort();
867 if backend_list.len() == len {
868 return Err(StateError::NoChange);
869 }
870 Ok(())
871 }
872
873 fn generate_requests(&self) -> Vec<Request> {
875 let mut v: Vec<Request> = Vec::new();
876
877 for listener in self.http_listeners.values() {
878 v.push(RequestType::AddHttpListener(listener.clone()).into());
879 if listener.active {
880 v.push(
881 RequestType::ActivateListener(ActivateListener {
882 address: listener.address,
883 proxy: ListenerType::Http.into(),
884 from_scm: false,
885 })
886 .into(),
887 );
888 }
889 }
890
891 for listener in self.https_listeners.values() {
892 v.push(RequestType::AddHttpsListener(listener.clone()).into());
893 if listener.active {
894 v.push(
895 RequestType::ActivateListener(ActivateListener {
896 address: listener.address,
897 proxy: ListenerType::Https.into(),
898 from_scm: false,
899 })
900 .into(),
901 );
902 }
903 }
904
905 for listener in self.tcp_listeners.values() {
906 v.push(RequestType::AddTcpListener(*listener).into());
907 if listener.active {
908 v.push(
909 RequestType::ActivateListener(ActivateListener {
910 address: listener.address,
911 proxy: ListenerType::Tcp.into(),
912 from_scm: false,
913 })
914 .into(),
915 );
916 }
917 }
918
919 for cluster in self.clusters.values() {
920 v.push(RequestType::AddCluster(cluster.clone()).into());
921 }
922
923 for front in self.http_fronts.values() {
924 v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
925 }
926
927 for (front, certs) in self.certificates.iter() {
928 for certificate_and_key in certs.values() {
929 v.push(
930 RequestType::AddCertificate(AddCertificate {
931 address: SocketAddress::from(*front),
932 certificate: certificate_and_key.clone(),
933 expired_at: None,
934 })
935 .into(),
936 );
937 }
938 }
939
940 for front in self.https_fronts.values() {
941 v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
942 }
943
944 for front_list in self.tcp_fronts.values() {
945 for front in front_list {
946 v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
947 }
948 }
949
950 for backend_list in self.backends.values() {
951 for backend in backend_list {
952 v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
953 }
954 }
955
956 v
957 }
958
959 pub fn generate_activate_requests(&self) -> Vec<Request> {
960 let mut v: Vec<Request> = Vec::new();
961 for front in self
962 .http_listeners
963 .iter()
964 .filter(|(_, listener)| listener.active)
965 .map(|(k, _)| k)
966 {
967 v.push(
968 RequestType::ActivateListener(ActivateListener {
969 address: SocketAddress::from(*front),
970 proxy: ListenerType::Http.into(),
971 from_scm: false,
972 })
973 .into(),
974 );
975 }
976
977 for front in self
978 .https_listeners
979 .iter()
980 .filter(|(_, listener)| listener.active)
981 .map(|(k, _)| k)
982 {
983 v.push(
984 RequestType::ActivateListener(ActivateListener {
985 address: SocketAddress::from(*front),
986 proxy: ListenerType::Https.into(),
987 from_scm: false,
988 })
989 .into(),
990 );
991 }
992 for front in self
993 .tcp_listeners
994 .iter()
995 .filter(|(_, listener)| listener.active)
996 .map(|(k, _)| k)
997 {
998 v.push(
999 RequestType::ActivateListener(ActivateListener {
1000 address: SocketAddress::from(*front),
1001 proxy: ListenerType::Tcp.into(),
1002 from_scm: false,
1003 })
1004 .into(),
1005 );
1006 }
1007
1008 v
1009 }
1010
1011 pub fn diff(&self, other: &ConfigState) -> Vec<Request> {
1012 let my_tcp_listeners: HashSet<&SocketAddr> = self.tcp_listeners.keys().collect();
1014 let their_tcp_listeners: HashSet<&SocketAddr> = other.tcp_listeners.keys().collect();
1015 let removed_tcp_listeners = my_tcp_listeners.difference(&their_tcp_listeners);
1016 let added_tcp_listeners = their_tcp_listeners.difference(&my_tcp_listeners);
1017
1018 let my_http_listeners: HashSet<&SocketAddr> = self.http_listeners.keys().collect();
1019 let their_http_listeners: HashSet<&SocketAddr> = other.http_listeners.keys().collect();
1020 let removed_http_listeners = my_http_listeners.difference(&their_http_listeners);
1021 let added_http_listeners = their_http_listeners.difference(&my_http_listeners);
1022
1023 let my_https_listeners: HashSet<&SocketAddr> = self.https_listeners.keys().collect();
1024 let their_https_listeners: HashSet<&SocketAddr> = other.https_listeners.keys().collect();
1025 let removed_https_listeners = my_https_listeners.difference(&their_https_listeners);
1026 let added_https_listeners = their_https_listeners.difference(&my_https_listeners);
1027
1028 let mut v: Vec<Request> = vec![];
1029
1030 for address in removed_tcp_listeners {
1031 if self.tcp_listeners[*address].active {
1032 v.push(
1033 RequestType::DeactivateListener(DeactivateListener {
1034 address: SocketAddress::from(**address),
1035 proxy: ListenerType::Tcp.into(),
1036 to_scm: false,
1037 })
1038 .into(),
1039 );
1040 }
1041
1042 v.push(
1043 RequestType::RemoveListener(RemoveListener {
1044 address: SocketAddress::from(**address),
1045 proxy: ListenerType::Tcp.into(),
1046 })
1047 .into(),
1048 );
1049 }
1050
1051 for address in added_tcp_listeners.clone() {
1052 v.push(RequestType::AddTcpListener(other.tcp_listeners[*address]).into());
1053
1054 if other.tcp_listeners[*address].active {
1055 v.push(
1056 RequestType::ActivateListener(ActivateListener {
1057 address: SocketAddress::from(**address),
1058 proxy: ListenerType::Tcp.into(),
1059 from_scm: false,
1060 })
1061 .into(),
1062 );
1063 }
1064 }
1065
1066 for address in removed_http_listeners {
1067 if self.http_listeners[*address].active {
1068 v.push(
1069 RequestType::DeactivateListener(DeactivateListener {
1070 address: SocketAddress::from(**address),
1071 proxy: ListenerType::Http.into(),
1072 to_scm: false,
1073 })
1074 .into(),
1075 );
1076 }
1077
1078 v.push(
1079 RequestType::RemoveListener(RemoveListener {
1080 address: SocketAddress::from(**address),
1081 proxy: ListenerType::Http.into(),
1082 })
1083 .into(),
1084 );
1085 }
1086
1087 for address in added_http_listeners.clone() {
1088 v.push(RequestType::AddHttpListener(other.http_listeners[*address].clone()).into());
1089
1090 if other.http_listeners[*address].active {
1091 v.push(
1092 RequestType::ActivateListener(ActivateListener {
1093 address: SocketAddress::from(**address),
1094 proxy: ListenerType::Http.into(),
1095 from_scm: false,
1096 })
1097 .into(),
1098 );
1099 }
1100 }
1101
1102 for address in removed_https_listeners {
1103 if self.https_listeners[*address].active {
1104 v.push(
1105 RequestType::DeactivateListener(DeactivateListener {
1106 address: SocketAddress::from(**address),
1107 proxy: ListenerType::Https.into(),
1108 to_scm: false,
1109 })
1110 .into(),
1111 );
1112 }
1113
1114 v.push(
1115 RequestType::RemoveListener(RemoveListener {
1116 address: SocketAddress::from(**address),
1117 proxy: ListenerType::Https.into(),
1118 })
1119 .into(),
1120 );
1121 }
1122
1123 for address in added_https_listeners.clone() {
1124 v.push(RequestType::AddHttpsListener(other.https_listeners[*address].clone()).into());
1125
1126 if other.https_listeners[*address].active {
1127 v.push(
1128 RequestType::ActivateListener(ActivateListener {
1129 address: SocketAddress::from(**address),
1130 proxy: ListenerType::Https.into(),
1131 from_scm: false,
1132 })
1133 .into(),
1134 );
1135 }
1136 }
1137
1138 for addr in my_tcp_listeners.intersection(&their_tcp_listeners) {
1139 let my_listener = &self.tcp_listeners[*addr];
1140 let their_listener = &other.tcp_listeners[*addr];
1141
1142 if my_listener != their_listener {
1143 v.push(
1144 RequestType::RemoveListener(RemoveListener {
1145 address: SocketAddress::from(**addr),
1146 proxy: ListenerType::Tcp.into(),
1147 })
1148 .into(),
1149 );
1150 let mut listener_to_add = *their_listener;
1152 listener_to_add.active = false;
1153 v.push(RequestType::AddTcpListener(listener_to_add).into());
1154 }
1155
1156 if my_listener.active && !their_listener.active {
1157 v.push(
1158 RequestType::DeactivateListener(DeactivateListener {
1159 address: SocketAddress::from(**addr),
1160 proxy: ListenerType::Tcp.into(),
1161 to_scm: false,
1162 })
1163 .into(),
1164 );
1165 }
1166
1167 if !my_listener.active && their_listener.active {
1168 v.push(
1169 RequestType::ActivateListener(ActivateListener {
1170 address: SocketAddress::from(**addr),
1171 proxy: ListenerType::Tcp.into(),
1172 from_scm: false,
1173 })
1174 .into(),
1175 );
1176 }
1177 }
1178
1179 for addr in my_http_listeners.intersection(&their_http_listeners) {
1180 let my_listener = &self.http_listeners[*addr];
1181 let their_listener = &other.http_listeners[*addr];
1182
1183 if my_listener != their_listener {
1184 v.push(
1185 RequestType::RemoveListener(RemoveListener {
1186 address: SocketAddress::from(**addr),
1187 proxy: ListenerType::Http.into(),
1188 })
1189 .into(),
1190 );
1191 let mut listener_to_add = their_listener.clone();
1193 listener_to_add.active = false;
1194 v.push(RequestType::AddHttpListener(listener_to_add).into());
1195 }
1196
1197 if my_listener.active && !their_listener.active {
1198 v.push(
1199 RequestType::DeactivateListener(DeactivateListener {
1200 address: SocketAddress::from(**addr),
1201 proxy: ListenerType::Http.into(),
1202 to_scm: false,
1203 })
1204 .into(),
1205 );
1206 }
1207
1208 if !my_listener.active && their_listener.active {
1209 v.push(
1210 RequestType::ActivateListener(ActivateListener {
1211 address: SocketAddress::from(**addr),
1212 proxy: ListenerType::Http.into(),
1213 from_scm: false,
1214 })
1215 .into(),
1216 );
1217 }
1218 }
1219
1220 for addr in my_https_listeners.intersection(&their_https_listeners) {
1221 let my_listener = &self.https_listeners[*addr];
1222 let their_listener = &other.https_listeners[*addr];
1223
1224 if my_listener != their_listener {
1225 v.push(
1226 RequestType::RemoveListener(RemoveListener {
1227 address: SocketAddress::from(**addr),
1228 proxy: ListenerType::Https.into(),
1229 })
1230 .into(),
1231 );
1232 let mut listener_to_add = their_listener.clone();
1234 listener_to_add.active = false;
1235 v.push(RequestType::AddHttpsListener(listener_to_add).into());
1236 }
1237
1238 if my_listener.active && !their_listener.active {
1239 v.push(
1240 RequestType::DeactivateListener(DeactivateListener {
1241 address: SocketAddress::from(**addr),
1242 proxy: ListenerType::Https.into(),
1243 to_scm: false,
1244 })
1245 .into(),
1246 );
1247 }
1248
1249 if !my_listener.active && their_listener.active {
1250 v.push(
1251 RequestType::ActivateListener(ActivateListener {
1252 address: SocketAddress::from(**addr),
1253 proxy: ListenerType::Https.into(),
1254 from_scm: false,
1255 })
1256 .into(),
1257 );
1258 }
1259 }
1260
1261 for (cluster_id, res) in diff_map(self.clusters.iter(), other.clusters.iter()) {
1262 match res {
1263 DiffResult::Added | DiffResult::Changed => v.push(
1264 RequestType::AddCluster(other.clusters.get(cluster_id).unwrap().clone()).into(),
1265 ),
1266 DiffResult::Removed => {
1267 v.push(RequestType::RemoveCluster(cluster_id.to_string()).into())
1268 }
1269 }
1270 }
1271
1272 for ((cluster_id, backend_id), res) in diff_map(
1273 self.backends.iter().flat_map(|(cluster_id, v)| {
1274 v.iter()
1275 .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1276 }),
1277 other.backends.iter().flat_map(|(cluster_id, v)| {
1278 v.iter()
1279 .map(move |backend| ((cluster_id, &backend.backend_id), backend))
1280 }),
1281 ) {
1282 match res {
1283 DiffResult::Added => {
1284 let backend = other
1285 .backends
1286 .get(cluster_id)
1287 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1288 .unwrap();
1289 v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
1290 }
1291 DiffResult::Removed => {
1292 let backend = self
1293 .backends
1294 .get(cluster_id)
1295 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1296 .unwrap();
1297
1298 v.push(
1299 RequestType::RemoveBackend(RemoveBackend {
1300 cluster_id: backend.cluster_id.clone(),
1301 backend_id: backend.backend_id.clone(),
1302 address: SocketAddress::from(backend.address),
1303 })
1304 .into(),
1305 );
1306 }
1307 DiffResult::Changed => {
1308 let backend = self
1309 .backends
1310 .get(cluster_id)
1311 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1312 .unwrap();
1313
1314 v.push(
1315 RequestType::RemoveBackend(RemoveBackend {
1316 cluster_id: backend.cluster_id.clone(),
1317 backend_id: backend.backend_id.clone(),
1318 address: SocketAddress::from(backend.address),
1319 })
1320 .into(),
1321 );
1322
1323 let backend = other
1324 .backends
1325 .get(cluster_id)
1326 .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
1327 .unwrap();
1328 v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
1329 }
1330 }
1331 }
1332
1333 let mut my_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
1334 for (route, front) in self.http_fronts.iter() {
1335 my_http_fronts.insert((route, front));
1336 }
1337 let mut their_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
1338 for (route, front) in other.http_fronts.iter() {
1339 their_http_fronts.insert((route, front));
1340 }
1341
1342 let removed_http_fronts = my_http_fronts.difference(&their_http_fronts);
1343 let added_http_fronts = their_http_fronts.difference(&my_http_fronts);
1344
1345 for &(_, front) in removed_http_fronts {
1346 v.push(RequestType::RemoveHttpFrontend(front.clone().into()).into());
1347 }
1348
1349 for &(_, front) in added_http_fronts {
1350 v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
1351 }
1352
1353 let mut my_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1354 for (route, front) in self.https_fronts.iter() {
1355 my_https_fronts.insert((route, front));
1356 }
1357 let mut their_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1358 for (route, front) in other.https_fronts.iter() {
1359 their_https_fronts.insert((route, front));
1360 }
1361 let removed_https_fronts = my_https_fronts.difference(&their_https_fronts);
1362 let added_https_fronts = their_https_fronts.difference(&my_https_fronts);
1363
1364 for &(_, front) in removed_https_fronts {
1365 v.push(RequestType::RemoveHttpsFrontend(front.clone().into()).into());
1366 }
1367
1368 for &(_, front) in added_https_fronts {
1369 v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
1370 }
1371
1372 let mut my_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1373 for (cluster_id, front_list) in self.tcp_fronts.iter() {
1374 for front in front_list.iter() {
1375 my_tcp_fronts.insert((cluster_id, front));
1376 }
1377 }
1378 let mut their_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1379 for (cluster_id, front_list) in other.tcp_fronts.iter() {
1380 for front in front_list.iter() {
1381 their_tcp_fronts.insert((cluster_id, front));
1382 }
1383 }
1384
1385 let removed_tcp_fronts = my_tcp_fronts.difference(&their_tcp_fronts);
1386 let added_tcp_fronts = their_tcp_fronts.difference(&my_tcp_fronts);
1387
1388 for &(_, front) in removed_tcp_fronts {
1389 v.push(RequestType::RemoveTcpFrontend(front.clone().into()).into());
1390 }
1391
1392 for &(_, front) in added_tcp_fronts {
1393 v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
1394 }
1395
1396 let my_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1398 self.certificates
1399 .iter()
1400 .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1401 );
1402 let their_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1403 other
1404 .certificates
1405 .iter()
1406 .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1407 );
1408
1409 let removed_certificates = my_certificates.difference(&their_certificates);
1410 let added_certificates = their_certificates.difference(&my_certificates);
1411
1412 for &(address, fingerprint) in removed_certificates {
1413 v.push(
1414 RequestType::RemoveCertificate(RemoveCertificate {
1415 address: SocketAddress::from(address),
1416 fingerprint: fingerprint.to_string(),
1417 })
1418 .into(),
1419 );
1420 }
1421
1422 for &(address, fingerprint) in added_certificates {
1423 if let Some(certificate_and_key) = other
1424 .certificates
1425 .get(&address)
1426 .and_then(|certs| certs.get(fingerprint))
1427 {
1428 v.push(
1429 RequestType::AddCertificate(AddCertificate {
1430 address: SocketAddress::from(address),
1431 certificate: certificate_and_key.clone(),
1432 expired_at: None,
1433 })
1434 .into(),
1435 );
1436 }
1437 }
1438
1439 for address in added_tcp_listeners {
1440 let listener = &other.tcp_listeners[*address];
1441 if listener.active {
1442 v.push(
1443 RequestType::ActivateListener(ActivateListener {
1444 address: listener.address,
1445 proxy: ListenerType::Tcp.into(),
1446 from_scm: false,
1447 })
1448 .into(),
1449 );
1450 }
1451 }
1452
1453 v
1454 }
1455
1456 pub fn hash_state(&self) -> BTreeMap<ClusterId, u64> {
1458 let mut hm: HashMap<ClusterId, DefaultHasher> = self
1459 .clusters
1460 .keys()
1461 .map(|cluster_id| {
1462 let mut hasher = DefaultHasher::new();
1463 self.clusters.get(cluster_id).hash(&mut hasher);
1464 if let Some(backends) = self.backends.get(cluster_id) {
1465 backends.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1466 }
1467 if let Some(tcp_fronts) = self.tcp_fronts.get(cluster_id) {
1468 tcp_fronts.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1469 }
1470 (cluster_id.to_owned(), hasher)
1471 })
1472 .collect();
1473
1474 for front in self.http_fronts.values() {
1475 if let Some(cluster_id) = &front.cluster_id {
1476 if let Some(hasher) = hm.get_mut(cluster_id) {
1477 front.hash(hasher);
1478 }
1479 }
1480 }
1481
1482 for front in self.https_fronts.values() {
1483 if let Some(cluster_id) = &front.cluster_id {
1484 if let Some(hasher) = hm.get_mut(cluster_id) {
1485 front.hash(hasher);
1486 }
1487 }
1488 }
1489
1490 hm.drain()
1491 .map(|(cluster_id, hasher)| (cluster_id, hasher.finish()))
1492 .collect()
1493 }
1494
1495 pub fn cluster_state(&self, cluster_id: &str) -> Option<ClusterInformation> {
1498 let configuration = self.clusters.get(cluster_id).cloned()?;
1499 info!("{:?}", configuration);
1500
1501 let http_frontends: Vec<RequestHttpFrontend> = self
1502 .http_fronts
1503 .values()
1504 .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1505 .map(|front| front.clone().into())
1506 .collect();
1507
1508 let https_frontends: Vec<RequestHttpFrontend> = self
1509 .https_fronts
1510 .values()
1511 .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1512 .map(|front| front.clone().into())
1513 .collect();
1514
1515 let tcp_frontends: Vec<RequestTcpFrontend> = self
1516 .tcp_fronts
1517 .get(cluster_id)
1518 .cloned()
1519 .unwrap_or_default()
1520 .iter()
1521 .map(|front| front.clone().into())
1522 .collect();
1523
1524 let backends: Vec<AddBackend> = self
1525 .backends
1526 .get(cluster_id)
1527 .cloned()
1528 .unwrap_or_default()
1529 .iter()
1530 .map(|backend| backend.clone().into())
1531 .collect();
1532
1533 Some(ClusterInformation {
1534 configuration: Some(configuration),
1535 http_frontends,
1536 https_frontends,
1537 tcp_frontends,
1538 backends,
1539 })
1540 }
1541
1542 pub fn count_backends(&self) -> usize {
1543 self.backends.values().fold(0, |acc, v| acc + v.len())
1544 }
1545
1546 pub fn count_frontends(&self) -> usize {
1547 self.http_fronts.values().count()
1548 + self.https_fronts.values().count()
1549 + self.tcp_fronts.values().fold(0, |acc, v| acc + v.len())
1550 }
1551
1552 pub fn get_cluster_ids_by_domain(
1553 &self,
1554 hostname: String,
1555 path: Option<String>,
1556 ) -> HashSet<ClusterId> {
1557 let mut cluster_ids: HashSet<ClusterId> = HashSet::new();
1558
1559 self.http_fronts.values().for_each(|front| {
1560 if domain_check(&front.hostname, &front.path, &hostname, &path) {
1561 if let Some(id) = &front.cluster_id {
1562 cluster_ids.insert(id.to_string());
1563 }
1564 }
1565 });
1566
1567 self.https_fronts.values().for_each(|front| {
1568 if domain_check(&front.hostname, &front.path, &hostname, &path) {
1569 if let Some(id) = &front.cluster_id {
1570 cluster_ids.insert(id.to_string());
1571 }
1572 }
1573 });
1574
1575 cluster_ids
1576 }
1577
1578 pub fn get_certificates(
1579 &self,
1580 filters: QueryCertificatesFilters,
1581 ) -> BTreeMap<String, CertificateAndKey> {
1582 self.certificates
1583 .values()
1584 .flat_map(|hash_map| hash_map.iter())
1585 .filter(|(fingerprint, cert)| {
1586 if let Some(domain) = &filters.domain {
1587 cert.names.contains(domain)
1588 } else if let Some(f) = &filters.fingerprint {
1589 fingerprint.to_string() == *f
1590 } else {
1591 true
1592 }
1593 })
1594 .map(|(fingerprint, cert)| (fingerprint.to_string(), cert.to_owned()))
1595 .collect()
1596 }
1597
1598 pub fn list_frontends(&self, filters: FrontendFilters) -> ListedFrontends {
1599 let list_all = !filters.http && !filters.https && !filters.tcp;
1601
1602 let mut listed_frontends = ListedFrontends::default();
1603
1604 if filters.http || list_all {
1605 for http_frontend in self.http_fronts.iter().filter(|f| {
1606 if let Some(domain) = &filters.domain {
1607 f.1.hostname.contains(domain)
1608 } else {
1609 true
1610 }
1611 }) {
1612 listed_frontends
1613 .http_frontends
1614 .push(http_frontend.1.to_owned().into());
1615 }
1616 }
1617
1618 if filters.https || list_all {
1619 for https_frontend in self.https_fronts.iter().filter(|f| {
1620 if let Some(domain) = &filters.domain {
1621 f.1.hostname.contains(domain)
1622 } else {
1623 true
1624 }
1625 }) {
1626 listed_frontends
1627 .https_frontends
1628 .push(https_frontend.1.to_owned().into());
1629 }
1630 }
1631
1632 if (filters.tcp || list_all) && filters.domain.is_none() {
1633 for tcp_frontend in self.tcp_fronts.values().flat_map(|v| v.iter()) {
1634 listed_frontends
1635 .tcp_frontends
1636 .push(tcp_frontend.to_owned().into())
1637 }
1638 }
1639
1640 listed_frontends
1641 }
1642
1643 pub fn list_listeners(&self) -> ListenersList {
1644 ListenersList {
1645 http_listeners: self
1646 .http_listeners
1647 .iter()
1648 .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1649 .collect(),
1650 https_listeners: self
1651 .https_listeners
1652 .iter()
1653 .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1654 .collect(),
1655 tcp_listeners: self
1656 .tcp_listeners
1657 .iter()
1658 .map(|(addr, listener)| (addr.to_string(), *listener))
1659 .collect(),
1660 }
1661 }
1662
1663 pub fn produce_initial_state(&self) -> InitialState {
1665 let mut worker_requests = Vec::new();
1666 for (counter, request) in self.generate_requests().into_iter().enumerate() {
1667 worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
1668 }
1669 InitialState {
1670 requests: worker_requests,
1671 }
1672 }
1673
1674 pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1677 let initial_state = self.produce_initial_state();
1678 let count = initial_state.requests.len();
1679
1680 let bytes_to_write = initial_state.encode_to_vec();
1681 println!("writing {} in the temp file", bytes_to_write.len());
1682 file.write_all(&bytes_to_write)
1683 .map_err(StateError::FileError)?;
1684
1685 file.sync_all().map_err(StateError::FileError)?;
1686
1687 Ok(count)
1688 }
1689
1690 pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1694 let mut counter = 0usize;
1695 let requests = self.generate_requests();
1696
1697 for request in requests {
1698 let message = WorkerRequest::new(format!("SAVE-{counter}"), request);
1699
1700 file.write_all(
1701 &serde_json::to_string(&message)
1702 .map(|s| s.into_bytes())
1703 .unwrap_or_default(),
1704 )
1705 .map_err(StateError::FileError)?;
1706
1707 file.write_all(&b"\n\0"[..])
1708 .map_err(StateError::FileError)?;
1709
1710 if counter % 1000 == 0 {
1711 info!("writing {} commands to file", counter);
1712 file.sync_all().map_err(StateError::FileError)?;
1713 }
1714 counter += 1;
1715 }
1716 file.sync_all().map_err(StateError::FileError)?;
1717
1718 Ok(counter)
1719 }
1720}
1721
1722pub fn validate_h2_flood_knobs_http(patch: &UpdateHttpListenerConfig) -> Result<(), StateError> {
1736 macro_rules! require_ge1 {
1737 ($field:expr, $name:literal) => {
1738 if let Some(0) = $field {
1739 return Err(StateError::InvalidValue {
1740 field: $name,
1741 reason: "must be >= 1",
1742 });
1743 }
1744 };
1745 }
1746 require_ge1!(
1747 patch.h2_max_rst_stream_per_window,
1748 "h2_max_rst_stream_per_window"
1749 );
1750 require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
1751 require_ge1!(
1752 patch.h2_max_settings_per_window,
1753 "h2_max_settings_per_window"
1754 );
1755 require_ge1!(
1756 patch.h2_max_empty_data_per_window,
1757 "h2_max_empty_data_per_window"
1758 );
1759 require_ge1!(
1760 patch.h2_max_continuation_frames,
1761 "h2_max_continuation_frames"
1762 );
1763 require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
1764 require_ge1!(
1765 patch.h2_max_window_update_stream0_per_window,
1766 "h2_max_window_update_stream0_per_window"
1767 );
1768 require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
1769 if let Some(v) = patch.h2_stream_shrink_ratio {
1772 if v < 2 {
1773 return Err(StateError::InvalidValue {
1774 field: "h2_stream_shrink_ratio",
1775 reason: "must be >= 2",
1776 });
1777 }
1778 }
1779 require_ge1!(
1782 patch.h2_max_rst_stream_lifetime,
1783 "h2_max_rst_stream_lifetime"
1784 );
1785 require_ge1!(
1786 patch.h2_max_rst_stream_abusive_lifetime,
1787 "h2_max_rst_stream_abusive_lifetime"
1788 );
1789 require_ge1!(
1790 patch.h2_max_rst_stream_emitted_lifetime,
1791 "h2_max_rst_stream_emitted_lifetime"
1792 );
1793 require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
1794 require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
1795 Ok(())
1796}
1797
1798pub fn validate_h2_flood_knobs_https(patch: &UpdateHttpsListenerConfig) -> Result<(), StateError> {
1800 macro_rules! require_ge1 {
1801 ($field:expr, $name:literal) => {
1802 if let Some(0) = $field {
1803 return Err(StateError::InvalidValue {
1804 field: $name,
1805 reason: "must be >= 1",
1806 });
1807 }
1808 };
1809 }
1810 require_ge1!(
1811 patch.h2_max_rst_stream_per_window,
1812 "h2_max_rst_stream_per_window"
1813 );
1814 require_ge1!(patch.h2_max_ping_per_window, "h2_max_ping_per_window");
1815 require_ge1!(
1816 patch.h2_max_settings_per_window,
1817 "h2_max_settings_per_window"
1818 );
1819 require_ge1!(
1820 patch.h2_max_empty_data_per_window,
1821 "h2_max_empty_data_per_window"
1822 );
1823 require_ge1!(
1824 patch.h2_max_continuation_frames,
1825 "h2_max_continuation_frames"
1826 );
1827 require_ge1!(patch.h2_max_glitch_count, "h2_max_glitch_count");
1828 require_ge1!(
1829 patch.h2_max_window_update_stream0_per_window,
1830 "h2_max_window_update_stream0_per_window"
1831 );
1832 require_ge1!(patch.h2_max_concurrent_streams, "h2_max_concurrent_streams");
1833 if let Some(v) = patch.h2_stream_shrink_ratio {
1834 if v < 2 {
1835 return Err(StateError::InvalidValue {
1836 field: "h2_stream_shrink_ratio",
1837 reason: "must be >= 2",
1838 });
1839 }
1840 }
1841 require_ge1!(
1842 patch.h2_max_rst_stream_lifetime,
1843 "h2_max_rst_stream_lifetime"
1844 );
1845 require_ge1!(
1846 patch.h2_max_rst_stream_abusive_lifetime,
1847 "h2_max_rst_stream_abusive_lifetime"
1848 );
1849 require_ge1!(
1850 patch.h2_max_rst_stream_emitted_lifetime,
1851 "h2_max_rst_stream_emitted_lifetime"
1852 );
1853 require_ge1!(patch.h2_max_header_list_size, "h2_max_header_list_size");
1854 require_ge1!(patch.h2_max_header_table_size, "h2_max_header_table_size");
1855 Ok(())
1856}
1857
1858pub fn merge_custom_http_answers(
1871 target: &mut Option<CustomHttpAnswers>,
1872 patch: &CustomHttpAnswers,
1873) {
1874 let current = target.get_or_insert_with(CustomHttpAnswers::default);
1875 macro_rules! merge_field {
1876 ($field:ident) => {
1877 if let Some(ref v) = patch.$field {
1878 current.$field = Some(v.clone());
1879 }
1880 };
1881 }
1882 merge_field!(answer_301);
1883 merge_field!(answer_400);
1884 merge_field!(answer_401);
1885 merge_field!(answer_404);
1886 merge_field!(answer_408);
1887 merge_field!(answer_413);
1888 merge_field!(answer_421);
1889 merge_field!(answer_502);
1890 merge_field!(answer_503);
1891 merge_field!(answer_504);
1892 merge_field!(answer_507);
1893}
1894
1895pub fn validate_alpn_protocols(values: &[String]) -> Result<(), StateError> {
1898 for value in values {
1899 if value != "h2" && value != "http/1.1" {
1900 return Err(StateError::InvalidValue {
1901 field: "alpn_protocols",
1902 reason: "each value must be \"h2\" or \"http/1.1\"",
1903 });
1904 }
1905 }
1906 Ok(())
1907}
1908
1909pub fn validate_sozu_id_header(value: &str) -> Result<(), StateError> {
1920 if value.is_empty() {
1921 return Err(StateError::InvalidValue {
1922 field: "sozu_id_header",
1923 reason: "must not be empty",
1924 });
1925 }
1926 for b in value.bytes() {
1927 let is_tchar = b.is_ascii_alphanumeric()
1928 || matches!(
1929 b,
1930 b'!' | b'#'
1931 | b'$'
1932 | b'%'
1933 | b'&'
1934 | b'\''
1935 | b'*'
1936 | b'+'
1937 | b'-'
1938 | b'.'
1939 | b'^'
1940 | b'_'
1941 | b'`'
1942 | b'|'
1943 | b'~'
1944 );
1945 if !is_tchar {
1946 return Err(StateError::InvalidValue {
1947 field: "sozu_id_header",
1948 reason: "must be a valid HTTP header name (RFC 9110 §5.1 token: alphanumeric or one of !#$%&'*+-.^_`|~)",
1949 });
1950 }
1951 }
1952 Ok(())
1953}
1954
1955fn domain_check(
1956 front_hostname: &str,
1957 front_path_rule: &PathRule,
1958 hostname: &str,
1959 path_prefix: &Option<String>,
1960) -> bool {
1961 if hostname != front_hostname {
1962 return false;
1963 }
1964
1965 if let Some(path) = &path_prefix {
1966 return path == &front_path_rule.value;
1967 }
1968
1969 true
1970}
1971
1972struct DiffMap<'a, K: Ord, V, I1, I2> {
1973 my_it: I1,
1974 other_it: I2,
1975 my: Option<(K, &'a V)>,
1976 other: Option<(K, &'a V)>,
1977}
1978
1979fn diff_map<
1981 'a,
1982 K: Ord,
1983 V: PartialEq,
1984 I1: Iterator<Item = (K, &'a V)>,
1985 I2: Iterator<Item = (K, &'a V)>,
1986>(
1987 my: I1,
1988 other: I2,
1989) -> DiffMap<'a, K, V, I1, I2> {
1990 DiffMap {
1991 my_it: my,
1992 other_it: other,
1993 my: None,
1994 other: None,
1995 }
1996}
1997
1998enum DiffResult {
1999 Added,
2000 Removed,
2001 Changed,
2002}
2003
2004impl<'a, K: Ord, V: PartialEq, I1: Iterator<Item = (K, &'a V)>, I2: Iterator<Item = (K, &'a V)>>
2007 std::iter::Iterator for DiffMap<'a, K, V, I1, I2>
2008{
2009 type Item = (K, DiffResult);
2010
2011 fn next(&mut self) -> Option<Self::Item> {
2012 loop {
2013 if self.my.is_none() {
2014 self.my = self.my_it.next();
2015 }
2016 if self.other.is_none() {
2017 self.other = self.other_it.next();
2018 }
2019
2020 match (self.my.take(), self.other.take()) {
2021 (None, other) => return other.map(|(k, _)| (k, DiffResult::Added)),
2025 (Some((k, _)), None) => return Some((k, DiffResult::Removed)),
2028 (Some((k1, _v1)), Some((k2, v2))) if k1 < k2 => {
2030 self.other = Some((k2, v2));
2031 return Some((k1, DiffResult::Removed));
2032 }
2033 (Some((k1, v1)), Some((k2, _v2))) if k1 > k2 => {
2035 self.my = Some((k1, v1));
2036 return Some((k2, DiffResult::Added));
2037 }
2038 (Some((k1, v1)), Some((_k2, v2))) if v1 != v2 => {
2039 return Some((k1, DiffResult::Changed));
2042 }
2043 _ => {}
2044 }
2045 }
2046 }
2047}
2048
2049#[cfg(test)]
2050mod tests {
2051 use rand::{RngExt, rng, seq::SliceRandom};
2052
2053 use super::*;
2054 use crate::proto::command::{
2055 CustomHttpAnswers, LoadBalancingParams, RequestHttpFrontend, RequestTcpFrontend,
2056 RulePosition,
2057 };
2058
2059 #[test]
2060 fn serialize() {
2061 let mut state: ConfigState = Default::default();
2062 state
2063 .dispatch(
2064 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2065 cluster_id: Some(String::from("cluster_1")),
2066 hostname: String::from("lolcatho.st:8080"),
2067 path: PathRule::prefix(String::from("/")),
2068 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2069 position: RulePosition::Tree.into(),
2070 ..Default::default()
2071 })
2072 .into(),
2073 )
2074 .expect("Could not execute request");
2075 state
2076 .dispatch(
2077 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2078 cluster_id: Some(String::from("cluster_2")),
2079 hostname: String::from("test.local"),
2080 path: PathRule::prefix(String::from("/abc")),
2081 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2082 position: RulePosition::Pre.into(),
2083 ..Default::default()
2084 })
2085 .into(),
2086 )
2087 .expect("Could not execute request");
2088 state
2089 .dispatch(
2090 &RequestType::AddBackend(AddBackend {
2091 cluster_id: String::from("cluster_1"),
2092 backend_id: String::from("cluster_1-0"),
2093 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2094 ..Default::default()
2095 })
2096 .into(),
2097 )
2098 .expect("Could not execute request");
2099 state
2100 .dispatch(
2101 &RequestType::AddBackend(AddBackend {
2102 cluster_id: String::from("cluster_1"),
2103 backend_id: String::from("cluster_1-1"),
2104 address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
2105 ..Default::default()
2106 })
2107 .into(),
2108 )
2109 .expect("Could not execute request");
2110 state
2111 .dispatch(
2112 &RequestType::AddBackend(AddBackend {
2113 cluster_id: String::from("cluster_2"),
2114 backend_id: String::from("cluster_2-0"),
2115 address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2116 ..Default::default()
2117 })
2118 .into(),
2119 )
2120 .expect("Could not execute request");
2121 state
2122 .dispatch(
2123 &RequestType::AddBackend(AddBackend {
2124 cluster_id: String::from("cluster_1"),
2125 backend_id: String::from("cluster_1-3"),
2126 address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2127 ..Default::default()
2128 })
2129 .into(),
2130 )
2131 .expect("Could not execute request");
2132 state
2133 .dispatch(
2134 &RequestType::RemoveBackend(RemoveBackend {
2135 cluster_id: String::from("cluster_1"),
2136 backend_id: String::from("cluster_1-3"),
2137 address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
2138 })
2139 .into(),
2140 )
2141 .expect("Could not execute request");
2142
2143 }
2153
2154 #[test]
2155 fn diff() {
2156 let mut state: ConfigState = Default::default();
2157 state
2158 .dispatch(
2159 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2160 cluster_id: Some(String::from("cluster_1")),
2161 hostname: String::from("lolcatho.st:8080"),
2162 path: PathRule::prefix(String::from("/")),
2163 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2164 position: RulePosition::Post.into(),
2165 ..Default::default()
2166 })
2167 .into(),
2168 )
2169 .expect("Could not execute request");
2170 state
2171 .dispatch(
2172 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2173 cluster_id: Some(String::from("cluster_2")),
2174 hostname: String::from("test.local"),
2175 path: PathRule::prefix(String::from("/abc")),
2176 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2177 ..Default::default()
2178 })
2179 .into(),
2180 )
2181 .expect("Could not execute request");
2182 state
2183 .dispatch(
2184 &RequestType::AddBackend(AddBackend {
2185 cluster_id: String::from("cluster_1"),
2186 backend_id: String::from("cluster_1-0"),
2187 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2188 load_balancing_parameters: Some(LoadBalancingParams::default()),
2189 ..Default::default()
2190 })
2191 .into(),
2192 )
2193 .expect("Could not execute request");
2194 state
2195 .dispatch(
2196 &RequestType::AddBackend(AddBackend {
2197 cluster_id: String::from("cluster_1"),
2198 backend_id: String::from("cluster_1-1"),
2199 address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
2200 load_balancing_parameters: Some(LoadBalancingParams::default()),
2201 ..Default::default()
2202 })
2203 .into(),
2204 )
2205 .expect("Could not execute request");
2206 state
2207 .dispatch(
2208 &RequestType::AddBackend(AddBackend {
2209 cluster_id: String::from("cluster_2"),
2210 backend_id: String::from("cluster_2-0"),
2211 address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2212 load_balancing_parameters: Some(LoadBalancingParams::default()),
2213 ..Default::default()
2214 })
2215 .into(),
2216 )
2217 .expect("Could not execute request");
2218 state
2219 .dispatch(
2220 &RequestType::AddCluster(Cluster {
2221 cluster_id: String::from("cluster_2"),
2222 sticky_session: true,
2223 https_redirect: true,
2224 ..Default::default()
2225 })
2226 .into(),
2227 )
2228 .expect("Could not execute request");
2229
2230 let mut state2: ConfigState = Default::default();
2231 state2
2232 .dispatch(
2233 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2234 cluster_id: Some(String::from("cluster_1")),
2235 hostname: String::from("lolcatho.st:8080"),
2236 path: PathRule::prefix(String::from("/")),
2237 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2238 position: RulePosition::Post.into(),
2239 ..Default::default()
2240 })
2241 .into(),
2242 )
2243 .expect("Could not execute request");
2244 state2
2245 .dispatch(
2246 &RequestType::AddBackend(AddBackend {
2247 cluster_id: String::from("cluster_1"),
2248 backend_id: String::from("cluster_1-0"),
2249 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2250 load_balancing_parameters: Some(LoadBalancingParams::default()),
2251 ..Default::default()
2252 })
2253 .into(),
2254 )
2255 .expect("Could not execute request");
2256 state2
2257 .dispatch(
2258 &RequestType::AddBackend(AddBackend {
2259 cluster_id: String::from("cluster_1"),
2260 backend_id: String::from("cluster_1-1"),
2261 address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
2262 load_balancing_parameters: Some(LoadBalancingParams::default()),
2263 ..Default::default()
2264 })
2265 .into(),
2266 )
2267 .expect("Could not execute request");
2268 state2
2269 .dispatch(
2270 &RequestType::AddBackend(AddBackend {
2271 cluster_id: String::from("cluster_1"),
2272 backend_id: String::from("cluster_1-2"),
2273 address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
2274 load_balancing_parameters: Some(LoadBalancingParams::default()),
2275 ..Default::default()
2276 })
2277 .into(),
2278 )
2279 .expect("Could not execute request");
2280 state2
2281 .dispatch(
2282 &RequestType::AddCluster(Cluster {
2283 cluster_id: String::from("cluster_3"),
2284 sticky_session: false,
2285 https_redirect: false,
2286 ..Default::default()
2287 })
2288 .into(),
2289 )
2290 .expect("Could not execute request");
2291
2292 let e: Vec<Request> = vec![
2293 RequestType::RemoveHttpFrontend(RequestHttpFrontend {
2294 cluster_id: Some(String::from("cluster_2")),
2295 hostname: String::from("test.local"),
2296 path: PathRule::prefix(String::from("/abc")),
2297 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2298 ..Default::default()
2299 })
2300 .into(),
2301 RequestType::RemoveBackend(RemoveBackend {
2302 cluster_id: String::from("cluster_2"),
2303 backend_id: String::from("cluster_2-0"),
2304 address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
2305 })
2306 .into(),
2307 RequestType::AddBackend(AddBackend {
2308 cluster_id: String::from("cluster_1"),
2309 backend_id: String::from("cluster_1-2"),
2310 address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
2311 load_balancing_parameters: Some(LoadBalancingParams::default()),
2312 ..Default::default()
2313 })
2314 .into(),
2315 RequestType::RemoveCluster(String::from("cluster_2")).into(),
2316 RequestType::AddCluster(Cluster {
2317 cluster_id: String::from("cluster_3"),
2318 sticky_session: false,
2319 https_redirect: false,
2320 ..Default::default()
2321 })
2322 .into(),
2323 ];
2324 let expected_diff: HashSet<&Request> = HashSet::from_iter(e.iter());
2325
2326 let d = state.diff(&state2);
2327 let diff = HashSet::from_iter(d.iter());
2328 println!("diff requests:\n{diff:#?}\n");
2329 println!("expected diff requests:\n{expected_diff:#?}\n");
2330
2331 let hash1 = state.hash_state();
2332 let hash2 = state2.hash_state();
2333 let mut state3 = state.clone();
2334 state3
2335 .dispatch(
2336 &RequestType::AddBackend(AddBackend {
2337 cluster_id: String::from("cluster_1"),
2338 backend_id: String::from("cluster_1-2"),
2339 address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
2340 load_balancing_parameters: Some(LoadBalancingParams::default()),
2341 ..Default::default()
2342 })
2343 .into(),
2344 )
2345 .expect("Could not execute request");
2346 let hash3 = state3.hash_state();
2347 println!("state 1 hashes: {hash1:#?}");
2348 println!("state 2 hashes: {hash2:#?}");
2349 println!("state 3 hashes: {hash3:#?}");
2350
2351 assert_eq!(diff, expected_diff);
2352 }
2353
2354 #[test]
2355 fn cluster_ids_by_domain() {
2356 let mut config = ConfigState::new();
2357 let http_front_cluster1 = RequestHttpFrontend {
2358 cluster_id: Some(String::from("MyCluster_1")),
2359 hostname: String::from("lolcatho.st"),
2360 path: PathRule::prefix(String::from("")),
2361 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2362 ..Default::default()
2363 };
2364
2365 let https_front_cluster1 = RequestHttpFrontend {
2366 cluster_id: Some(String::from("MyCluster_1")),
2367 hostname: String::from("lolcatho.st"),
2368 path: PathRule::prefix(String::from("")),
2369 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2370 ..Default::default()
2371 };
2372
2373 let http_front_cluster2 = RequestHttpFrontend {
2374 cluster_id: Some(String::from("MyCluster_2")),
2375 hostname: String::from("lolcatho.st"),
2376 path: PathRule::prefix(String::from("/api")),
2377 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2378 ..Default::default()
2379 };
2380
2381 let https_front_cluster2 = RequestHttpFrontend {
2382 cluster_id: Some(String::from("MyCluster_2")),
2383 hostname: String::from("lolcatho.st"),
2384 path: PathRule::prefix(String::from("/api")),
2385 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2386 ..Default::default()
2387 };
2388
2389 config
2390 .dispatch(&RequestType::AddHttpFrontend(http_front_cluster1).into())
2391 .expect("Could not execute request");
2392 config
2393 .dispatch(&RequestType::AddHttpFrontend(http_front_cluster2).into())
2394 .expect("Could not execute request");
2395 config
2396 .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster1).into())
2397 .expect("Could not execute request");
2398 config
2399 .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster2).into())
2400 .expect("Could not execute request");
2401
2402 let mut cluster1_cluster2: HashSet<ClusterId> = HashSet::new();
2403 cluster1_cluster2.insert(String::from("MyCluster_1"));
2404 cluster1_cluster2.insert(String::from("MyCluster_2"));
2405
2406 let mut cluster2: HashSet<ClusterId> = HashSet::new();
2407 cluster2.insert(String::from("MyCluster_2"));
2408
2409 let empty: HashSet<ClusterId> = HashSet::new();
2410 assert_eq!(
2411 config.get_cluster_ids_by_domain(String::from("lolcatho.st"), None),
2412 cluster1_cluster2
2413 );
2414 assert_eq!(
2415 config
2416 .get_cluster_ids_by_domain(String::from("lolcatho.st"), Some(String::from("/api"))),
2417 cluster2
2418 );
2419 assert_eq!(
2420 config.get_cluster_ids_by_domain(String::from("lolcathost"), None),
2421 empty
2422 );
2423 assert_eq!(
2424 config
2425 .get_cluster_ids_by_domain(String::from("lolcathost"), Some(String::from("/sozu"))),
2426 empty
2427 );
2428 }
2429
2430 #[test]
2431 fn duplicate_backends() {
2432 let mut state: ConfigState = Default::default();
2433 state
2434 .dispatch(
2435 &RequestType::AddBackend(AddBackend {
2436 cluster_id: String::from("cluster_1"),
2437 backend_id: String::from("cluster_1-0"),
2438 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2439 load_balancing_parameters: Some(LoadBalancingParams::default()),
2440 ..Default::default()
2441 })
2442 .into(),
2443 )
2444 .expect("Could not execute request");
2445
2446 let b = Backend {
2447 cluster_id: String::from("cluster_1"),
2448 backend_id: String::from("cluster_1-0"),
2449 address: "127.0.0.1:1026".parse().unwrap(),
2450 load_balancing_parameters: Some(LoadBalancingParams::default()),
2451 sticky_id: Some("sticky".to_string()),
2452 backup: None,
2453 };
2454
2455 state
2456 .dispatch(&RequestType::AddBackend(b.clone().to_add_backend()).into())
2457 .expect("Could not execute order");
2458
2459 assert_eq!(state.backends.get("cluster_1").unwrap(), &vec![b]);
2460 }
2461
2462 #[test]
2463 fn remove_backend() {
2464 let mut state: ConfigState = Default::default();
2465 state
2466 .dispatch(
2467 &RequestType::AddCluster(Cluster {
2468 cluster_id: String::from("cluster_1"),
2469 ..Default::default()
2470 })
2471 .into(),
2472 )
2473 .expect("Could not execute request");
2474
2475 for i in 0..10 {
2476 state
2477 .dispatch(
2478 &RequestType::AddBackend(AddBackend {
2479 cluster_id: String::from("cluster_1"),
2480 backend_id: format!("cluster_1-{i}"),
2481 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2482 ..Default::default()
2483 })
2484 .into(),
2485 )
2486 .expect("Could not execute request");
2487 }
2488
2489 assert_eq!(state.backends.get("cluster_1").unwrap().len(), 10);
2490
2491 let remove_backend_2 = RequestType::RemoveBackend(RemoveBackend {
2492 cluster_id: String::from("cluster_1"),
2493 backend_id: String::from("cluster_1-0"),
2494 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2495 })
2496 .into();
2497
2498 let remove_backend_result = state.dispatch(&remove_backend_2);
2499
2500 assert!(remove_backend_result.is_ok());
2501 assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
2502
2503 let redundant_remove = state.dispatch(&remove_backend_2);
2504 assert!(matches!(redundant_remove, Err(StateError::NoChange)));
2505 assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
2506 }
2507
2508 #[test]
2509 fn remove_backends_randomly() {
2510 let mut state: ConfigState = Default::default();
2511 state
2512 .dispatch(
2513 &RequestType::AddCluster(Cluster {
2514 cluster_id: String::from("cluster_1"),
2515 ..Default::default()
2516 })
2517 .into(),
2518 )
2519 .expect("Could not execute request");
2520
2521 for _ in 0..1000 {
2522 for i in 0..10 {
2523 state
2524 .dispatch(
2525 &RequestType::AddBackend(AddBackend {
2526 cluster_id: String::from("cluster_1"),
2527 backend_id: format!("cluster_1-{i}"),
2528 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2529 ..Default::default()
2530 })
2531 .into(),
2532 )
2533 .expect("Could not execute request");
2534 }
2535
2536 let mut rng = rng();
2537 let mut indexes = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
2538 indexes.shuffle(&mut rng);
2539 let random_count = rng.random_range(1..indexes.len());
2540 let random_indexes: Vec<i32> = indexes.into_iter().take(random_count).collect();
2541
2542 for j in random_indexes {
2543 let remove_backend_result = state.dispatch(
2544 &RequestType::RemoveBackend(RemoveBackend {
2545 cluster_id: String::from("cluster_1"),
2546 backend_id: format!("cluster_1-{j}"),
2547 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2548 })
2549 .into(),
2550 );
2551 assert!(remove_backend_result.is_ok());
2552 }
2553 }
2554 }
2555
2556 #[test]
2557 fn listener_diff() {
2558 let mut state: ConfigState = Default::default();
2559 let custom_http_answers = Some(CustomHttpAnswers {
2560 answer_404: Some("test".to_string()),
2561 ..Default::default()
2562 });
2563 state
2564 .dispatch(
2565 &RequestType::AddTcpListener(TcpListenerConfig {
2566 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2567 ..Default::default()
2568 })
2569 .into(),
2570 )
2571 .expect("Could not execute request");
2572 state
2573 .dispatch(
2574 &RequestType::ActivateListener(ActivateListener {
2575 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2576 proxy: ListenerType::Tcp.into(),
2577 from_scm: false,
2578 })
2579 .into(),
2580 )
2581 .expect("Could not execute request");
2582 state
2583 .dispatch(
2584 &RequestType::AddHttpListener(HttpListenerConfig {
2585 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2586 ..Default::default()
2587 })
2588 .into(),
2589 )
2590 .expect("Could not execute request");
2591 state
2592 .dispatch(
2593 &RequestType::AddHttpsListener(HttpsListenerConfig {
2594 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2595 ..Default::default()
2596 })
2597 .into(),
2598 )
2599 .expect("Could not execute request");
2600 state
2601 .dispatch(
2602 &RequestType::ActivateListener(ActivateListener {
2603 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2604 proxy: ListenerType::Https.into(),
2605 from_scm: false,
2606 })
2607 .into(),
2608 )
2609 .expect("Could not execute request");
2610
2611 let mut state2: ConfigState = Default::default();
2612 state2
2613 .dispatch(
2614 &RequestType::AddTcpListener(TcpListenerConfig {
2615 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2616 expect_proxy: true,
2617 ..Default::default()
2618 })
2619 .into(),
2620 )
2621 .expect("Could not execute request");
2622 state2
2623 .dispatch(
2624 &RequestType::AddHttpListener(HttpListenerConfig {
2625 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2626 http_answers: custom_http_answers.clone(),
2627 ..Default::default()
2628 })
2629 .into(),
2630 )
2631 .expect("Could not execute request");
2632 state2
2633 .dispatch(
2634 &RequestType::ActivateListener(ActivateListener {
2635 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2636 proxy: ListenerType::Http.into(),
2637 from_scm: false,
2638 })
2639 .into(),
2640 )
2641 .expect("Could not execute request");
2642 state2
2643 .dispatch(
2644 &RequestType::AddHttpsListener(HttpsListenerConfig {
2645 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2646 http_answers: custom_http_answers.clone(),
2647 ..Default::default()
2648 })
2649 .into(),
2650 )
2651 .expect("Could not execute request");
2652 state2
2653 .dispatch(
2654 &RequestType::ActivateListener(ActivateListener {
2655 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2656 proxy: ListenerType::Https.into(),
2657 from_scm: false,
2658 })
2659 .into(),
2660 )
2661 .expect("Could not execute request");
2662
2663 let e: Vec<Request> = vec![
2664 RequestType::RemoveListener(RemoveListener {
2665 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2666 proxy: ListenerType::Tcp.into(),
2667 })
2668 .into(),
2669 RequestType::AddTcpListener(TcpListenerConfig {
2670 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2671 expect_proxy: true,
2672 ..Default::default()
2673 })
2674 .into(),
2675 RequestType::DeactivateListener(DeactivateListener {
2676 address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2677 proxy: ListenerType::Tcp.into(),
2678 to_scm: false,
2679 })
2680 .into(),
2681 RequestType::RemoveListener(RemoveListener {
2682 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2683 proxy: ListenerType::Http.into(),
2684 })
2685 .into(),
2686 RequestType::AddHttpListener(HttpListenerConfig {
2687 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2688 http_answers: custom_http_answers.clone(),
2689 ..Default::default()
2690 })
2691 .into(),
2692 RequestType::ActivateListener(ActivateListener {
2693 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2694 proxy: ListenerType::Http.into(),
2695 from_scm: false,
2696 })
2697 .into(),
2698 RequestType::RemoveListener(RemoveListener {
2699 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2700 proxy: ListenerType::Https.into(),
2701 })
2702 .into(),
2703 RequestType::AddHttpsListener(HttpsListenerConfig {
2704 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2705 http_answers: custom_http_answers.clone(),
2706 ..Default::default()
2707 })
2708 .into(),
2709 ];
2710
2711 let diff = state.diff(&state2);
2712 println!("expected diff requests:\n{e:#?}\n");
2714 println!("diff requests:\n{diff:#?}\n");
2715
2716 let _hash1 = state.hash_state();
2717 let _hash2 = state2.hash_state();
2718
2719 assert_eq!(diff, e);
2720 }
2721
2722 #[test]
2723 fn certificate_retrieval() {
2724 let mut state: ConfigState = Default::default();
2725 let certificate_and_key = CertificateAndKey {
2726 certificate: String::from(include_str!("../assets/certificate.pem")),
2727 key: String::from(include_str!("../assets/key.pem")),
2728 certificate_chain: vec![],
2729 versions: vec![],
2730 names: vec!["lolcatho.st".to_string()],
2731 };
2732 let add_certificate = AddCertificate {
2733 address: SocketAddress::new_v4(127, 0, 0, 1, 8080),
2734 certificate: certificate_and_key,
2735 expired_at: None,
2736 };
2737 state
2738 .dispatch(&RequestType::AddCertificate(add_certificate).into())
2739 .expect("Could not add certificate");
2740
2741 println!("state: {state:#?}");
2742
2743 let certificates_found_by_fingerprint = state.get_certificates(QueryCertificatesFilters {
2749 domain: None,
2750 fingerprint: Some(
2751 "ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5".to_string(),
2752 ),
2753 });
2754
2755 println!("found certificate: {certificates_found_by_fingerprint:#?}");
2756
2757 assert!(!certificates_found_by_fingerprint.is_empty());
2758
2759 let certificate_found_by_domain_name = state.get_certificates(QueryCertificatesFilters {
2760 domain: Some("lolcatho.st".to_string()),
2761 fingerprint: None,
2762 });
2763
2764 assert!(!certificate_found_by_domain_name.is_empty());
2765 }
2766
2767 #[test]
2768 fn count_backends_across_clusters() {
2769 let mut state: ConfigState = Default::default();
2770
2771 assert_eq!(state.count_backends(), 0);
2772
2773 state
2774 .dispatch(
2775 &RequestType::AddBackend(AddBackend {
2776 cluster_id: String::from("cluster_1"),
2777 backend_id: String::from("cluster_1-0"),
2778 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2779 ..Default::default()
2780 })
2781 .into(),
2782 )
2783 .expect("Could not execute request");
2784 assert_eq!(state.count_backends(), 1);
2785
2786 state
2787 .dispatch(
2788 &RequestType::AddBackend(AddBackend {
2789 cluster_id: String::from("cluster_1"),
2790 backend_id: String::from("cluster_1-1"),
2791 address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
2792 ..Default::default()
2793 })
2794 .into(),
2795 )
2796 .expect("Could not execute request");
2797 assert_eq!(state.count_backends(), 2);
2798
2799 state
2801 .dispatch(
2802 &RequestType::AddBackend(AddBackend {
2803 cluster_id: String::from("cluster_2"),
2804 backend_id: String::from("cluster_2-0"),
2805 address: SocketAddress::new_v4(192, 168, 1, 1, 8080),
2806 ..Default::default()
2807 })
2808 .into(),
2809 )
2810 .expect("Could not execute request");
2811 assert_eq!(state.count_backends(), 3);
2812
2813 state
2815 .dispatch(
2816 &RequestType::RemoveBackend(RemoveBackend {
2817 cluster_id: String::from("cluster_1"),
2818 backend_id: String::from("cluster_1-0"),
2819 address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
2820 })
2821 .into(),
2822 )
2823 .expect("Could not execute request");
2824 assert_eq!(state.count_backends(), 2);
2825 }
2826
2827 #[test]
2828 fn count_frontends_across_types() {
2829 let mut state: ConfigState = Default::default();
2830
2831 assert_eq!(state.count_frontends(), 0);
2832
2833 state
2835 .dispatch(
2836 &RequestType::AddHttpFrontend(RequestHttpFrontend {
2837 cluster_id: Some(String::from("cluster_1")),
2838 hostname: String::from("example.com"),
2839 path: PathRule::prefix(String::from("/")),
2840 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2841 position: RulePosition::Tree.into(),
2842 ..Default::default()
2843 })
2844 .into(),
2845 )
2846 .expect("Could not execute request");
2847 assert_eq!(state.count_frontends(), 1);
2848
2849 state
2851 .dispatch(
2852 &RequestType::AddHttpsFrontend(RequestHttpFrontend {
2853 cluster_id: Some(String::from("cluster_1")),
2854 hostname: String::from("secure.example.com"),
2855 path: PathRule::prefix(String::from("/")),
2856 address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2857 position: RulePosition::Tree.into(),
2858 ..Default::default()
2859 })
2860 .into(),
2861 )
2862 .expect("Could not execute request");
2863 assert_eq!(state.count_frontends(), 2);
2864
2865 state
2867 .dispatch(
2868 &RequestType::AddTcpFrontend(RequestTcpFrontend {
2869 cluster_id: String::from("cluster_2"),
2870 address: SocketAddress::new_v4(0, 0, 0, 0, 5432),
2871 ..Default::default()
2872 })
2873 .into(),
2874 )
2875 .expect("Could not execute request");
2876 assert_eq!(state.count_frontends(), 3);
2877
2878 state
2880 .dispatch(
2881 &RequestType::AddTcpFrontend(RequestTcpFrontend {
2882 cluster_id: String::from("cluster_2"),
2883 address: SocketAddress::new_v4(0, 0, 0, 0, 5433),
2884 ..Default::default()
2885 })
2886 .into(),
2887 )
2888 .expect("Could not execute request");
2889 assert_eq!(state.count_frontends(), 4);
2890
2891 state
2893 .dispatch(
2894 &RequestType::RemoveHttpFrontend(RequestHttpFrontend {
2895 cluster_id: Some(String::from("cluster_1")),
2896 hostname: String::from("example.com"),
2897 path: PathRule::prefix(String::from("/")),
2898 address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2899 position: RulePosition::Tree.into(),
2900 ..Default::default()
2901 })
2902 .into(),
2903 )
2904 .expect("Could not execute request");
2905 assert_eq!(state.count_frontends(), 3);
2906 }
2907
2908 fn make_https_listener(address: SocketAddress) -> HttpsListenerConfig {
2911 HttpsListenerConfig {
2912 address,
2913 sticky_name: "SOZUBALANCEID".to_owned(),
2914 front_timeout: 60,
2915 back_timeout: 30,
2916 connect_timeout: 3,
2917 request_timeout: 10,
2918 ..Default::default()
2919 }
2920 }
2921
2922 fn make_http_listener(address: SocketAddress) -> HttpListenerConfig {
2923 HttpListenerConfig {
2924 address,
2925 sticky_name: "SOZUBALANCEID".to_owned(),
2926 front_timeout: 60,
2927 back_timeout: 30,
2928 connect_timeout: 3,
2929 request_timeout: 10,
2930 ..Default::default()
2931 }
2932 }
2933
2934 fn make_tcp_listener(address: SocketAddress) -> TcpListenerConfig {
2935 TcpListenerConfig {
2936 address,
2937 front_timeout: 60,
2938 back_timeout: 30,
2939 connect_timeout: 3,
2940 ..Default::default()
2941 }
2942 }
2943
2944 #[test]
2949 fn update_https_listener_happy_path_h2_knobs() {
2950 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
2951 let mut state = ConfigState::new();
2952 state
2953 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
2954 .unwrap();
2955
2956 let patch = UpdateHttpsListenerConfig {
2957 address: addr,
2958 h2_max_rst_stream_per_window: Some(50),
2959 h2_max_ping_per_window: Some(20),
2960 ..Default::default()
2961 };
2962 state
2963 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
2964 .expect("update must succeed");
2965
2966 let listener = state
2967 .https_listeners
2968 .get(&SocketAddr::from(addr))
2969 .expect("listener must be present");
2970 assert_eq!(listener.h2_max_rst_stream_per_window, Some(50));
2971 assert_eq!(listener.h2_max_ping_per_window, Some(20));
2972 assert_eq!(listener.front_timeout, 60);
2974 assert_eq!(listener.h2_max_settings_per_window, None);
2975 }
2976
2977 #[test]
2980 fn update_https_listener_not_found() {
2981 let mut state = ConfigState::new();
2982 let patch = UpdateHttpsListenerConfig {
2983 address: SocketAddress::new_v4(1, 2, 3, 4, 9999),
2984 h2_max_rst_stream_per_window: Some(50),
2985 ..Default::default()
2986 };
2987 let err = state
2988 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
2989 .unwrap_err();
2990 assert!(
2991 matches!(
2992 err,
2993 StateError::NotFound {
2994 kind: ObjectKind::HttpsListener,
2995 ..
2996 }
2997 ),
2998 "expected NotFound, got: {err}"
2999 );
3000 }
3001
3002 #[test]
3005 fn update_https_listener_noop_patch() {
3006 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3007 let mut state = ConfigState::new();
3008 let original = make_https_listener(addr);
3009 state
3010 .dispatch(&RequestType::AddHttpsListener(original.clone()).into())
3011 .unwrap();
3012
3013 let patch = UpdateHttpsListenerConfig {
3014 address: addr,
3015 ..Default::default()
3016 };
3017 state
3018 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3019 .expect("no-op patch must succeed");
3020
3021 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3022 assert_eq!(listener.front_timeout, original.front_timeout);
3023 assert_eq!(
3024 listener.h2_max_rst_stream_per_window,
3025 original.h2_max_rst_stream_per_window
3026 );
3027 }
3028
3029 #[test]
3031 fn update_https_listener_invalid_value_flood_knob_zero() {
3032 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3033 let mut state = ConfigState::new();
3034 state
3035 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3036 .unwrap();
3037
3038 let patch = UpdateHttpsListenerConfig {
3039 address: addr,
3040 h2_max_rst_stream_per_window: Some(0),
3041 ..Default::default()
3042 };
3043 let err = state
3044 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3045 .unwrap_err();
3046 assert!(
3047 matches!(
3048 err,
3049 StateError::InvalidValue {
3050 field: "h2_max_rst_stream_per_window",
3051 ..
3052 }
3053 ),
3054 "expected InvalidValue for flood knob 0, got: {err}"
3055 );
3056 }
3057
3058 #[test]
3064 fn add_cluster_invalid_health_check_uri_rejected() {
3065 use crate::proto::command::HealthCheckConfig;
3066
3067 let mut state = ConfigState::new();
3068 let err = state
3069 .dispatch(
3070 &RequestType::AddCluster(Cluster {
3071 cluster_id: String::from("evil_cluster"),
3072 health_check: Some(HealthCheckConfig {
3073 uri: String::from("/foo\r\nGET /admin"),
3074 interval: 5_000,
3075 timeout: 1_000,
3076 healthy_threshold: 2,
3077 unhealthy_threshold: 2,
3078 ..Default::default()
3079 }),
3080 ..Default::default()
3081 })
3082 .into(),
3083 )
3084 .unwrap_err();
3085
3086 assert!(
3087 matches!(
3088 err,
3089 StateError::InvalidValue {
3090 field: "health_check",
3091 ..
3092 }
3093 ),
3094 "expected InvalidValue for CRLF-bearing health-check URI, got: {err}"
3095 );
3096 assert!(
3097 !state.clusters.contains_key("evil_cluster"),
3098 "cluster must not be inserted when health_check fails validation",
3099 );
3100 }
3101
3102 #[test]
3104 fn update_https_listener_alpn_unknown_value_rejected() {
3105 use crate::proto::command::AlpnProtocols;
3106
3107 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3108 let mut state = ConfigState::new();
3109 state
3110 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3111 .unwrap();
3112
3113 let patch = UpdateHttpsListenerConfig {
3114 address: addr,
3115 alpn_protocols: Some(AlpnProtocols {
3116 values: vec!["h3".to_owned()],
3117 }),
3118 ..Default::default()
3119 };
3120 let err = state
3121 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3122 .unwrap_err();
3123 assert!(
3124 matches!(
3125 err,
3126 StateError::InvalidValue {
3127 field: "alpn_protocols",
3128 ..
3129 }
3130 ),
3131 "expected InvalidValue for unknown ALPN, got: {err}"
3132 );
3133 }
3134
3135 #[test]
3137 fn update_https_listener_alpn_empty_reset_accepted() {
3138 use crate::proto::command::AlpnProtocols;
3139
3140 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3141 let mut state = ConfigState::new();
3142 let mut listener = make_https_listener(addr);
3143 listener.alpn_protocols = vec!["h2".to_owned()];
3144 state
3145 .dispatch(&RequestType::AddHttpsListener(listener).into())
3146 .unwrap();
3147
3148 let patch = UpdateHttpsListenerConfig {
3149 address: addr,
3150 alpn_protocols: Some(AlpnProtocols { values: vec![] }),
3151 ..Default::default()
3152 };
3153 state
3154 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3155 .expect("empty ALPN reset must succeed");
3156
3157 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3158 assert!(
3159 listener.alpn_protocols.is_empty(),
3160 "ALPN must have been reset to empty"
3161 );
3162 }
3163
3164 #[test]
3166 fn update_https_listener_alpn_valid_values_accepted() {
3167 use crate::proto::command::AlpnProtocols;
3168
3169 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3170 let mut state = ConfigState::new();
3171 state
3172 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3173 .unwrap();
3174
3175 let patch = UpdateHttpsListenerConfig {
3176 address: addr,
3177 alpn_protocols: Some(AlpnProtocols {
3178 values: vec!["h2".to_owned(), "http/1.1".to_owned()],
3179 }),
3180 ..Default::default()
3181 };
3182 state
3183 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3184 .expect("valid ALPN must be accepted");
3185
3186 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3187 assert_eq!(listener.alpn_protocols, vec!["h2", "http/1.1"]);
3188 }
3189
3190 #[test]
3193 fn update_https_listener_alpn_absent_preserves_existing() {
3194 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3195 let mut state = ConfigState::new();
3196 let mut listener = make_https_listener(addr);
3197 listener.alpn_protocols = vec!["h2".to_owned()];
3198 state
3199 .dispatch(&RequestType::AddHttpsListener(listener).into())
3200 .unwrap();
3201
3202 let patch = UpdateHttpsListenerConfig {
3204 address: addr,
3205 front_timeout: Some(10),
3206 ..Default::default()
3207 };
3208 state
3209 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3210 .unwrap();
3211
3212 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3213 assert_eq!(
3214 listener.alpn_protocols,
3215 vec!["h2"],
3216 "ALPN must be preserved when not patched"
3217 );
3218 }
3219
3220 #[test]
3222 fn update_https_listener_sozu_id_header_empty_rejected() {
3223 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3224 let mut state = ConfigState::new();
3225 state
3226 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3227 .unwrap();
3228
3229 let patch = UpdateHttpsListenerConfig {
3230 address: addr,
3231 sozu_id_header: Some(String::new()),
3232 ..Default::default()
3233 };
3234 let err = state
3235 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3236 .unwrap_err();
3237 assert!(
3238 matches!(
3239 err,
3240 StateError::InvalidValue {
3241 field: "sozu_id_header",
3242 ..
3243 }
3244 ),
3245 "expected InvalidValue for empty header name, got: {err}"
3246 );
3247 }
3248
3249 #[test]
3251 fn update_https_listener_sozu_id_header_colon_rejected() {
3252 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3253 let mut state = ConfigState::new();
3254 state
3255 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3256 .unwrap();
3257
3258 let patch = UpdateHttpsListenerConfig {
3259 address: addr,
3260 sozu_id_header: Some("bad: value".to_owned()),
3261 ..Default::default()
3262 };
3263 let err = state
3264 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3265 .unwrap_err();
3266 assert!(
3267 matches!(
3268 err,
3269 StateError::InvalidValue {
3270 field: "sozu_id_header",
3271 ..
3272 }
3273 ),
3274 "expected InvalidValue for header name with colon, got: {err}"
3275 );
3276 }
3277
3278 #[test]
3280 fn update_https_listener_sozu_id_header_valid_accepted() {
3281 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3282 let mut state = ConfigState::new();
3283 state
3284 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3285 .unwrap();
3286
3287 let patch = UpdateHttpsListenerConfig {
3288 address: addr,
3289 sozu_id_header: Some("X-Edge-Id".to_owned()),
3290 ..Default::default()
3291 };
3292 state
3293 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3294 .expect("valid header name must be accepted");
3295
3296 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3297 assert_eq!(listener.sozu_id_header.as_deref(), Some("X-Edge-Id"));
3298 }
3299
3300 #[test]
3302 fn update_https_listener_graceful_shutdown_zero_allowed() {
3303 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8443);
3304 let mut state = ConfigState::new();
3305 state
3306 .dispatch(&RequestType::AddHttpsListener(make_https_listener(addr)).into())
3307 .unwrap();
3308
3309 let patch = UpdateHttpsListenerConfig {
3310 address: addr,
3311 h2_graceful_shutdown_deadline_seconds: Some(0),
3312 ..Default::default()
3313 };
3314 state
3315 .dispatch(&RequestType::UpdateHttpsListener(patch).into())
3316 .expect("graceful_shutdown_deadline=0 must be allowed");
3317
3318 let listener = state.https_listeners.get(&SocketAddr::from(addr)).unwrap();
3319 assert_eq!(listener.h2_graceful_shutdown_deadline_seconds, Some(0));
3320 }
3321
3322 #[test]
3326 fn update_http_listener_happy_path() {
3327 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
3328 let mut state = ConfigState::new();
3329 state
3330 .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
3331 .unwrap();
3332
3333 let patch = UpdateHttpListenerConfig {
3334 address: addr,
3335 front_timeout: Some(15),
3336 h2_max_rst_stream_per_window: Some(25),
3337 ..Default::default()
3338 };
3339 state
3340 .dispatch(&RequestType::UpdateHttpListener(patch).into())
3341 .expect("HTTP update must succeed");
3342
3343 let listener = state.http_listeners.get(&SocketAddr::from(addr)).unwrap();
3344 assert_eq!(listener.front_timeout, 15);
3345 assert_eq!(listener.h2_max_rst_stream_per_window, Some(25));
3346 assert_eq!(listener.back_timeout, 30);
3348 }
3349
3350 #[test]
3352 fn update_http_listener_flood_knob_zero_rejected() {
3353 let addr = SocketAddress::new_v4(0, 0, 0, 0, 8080);
3354 let mut state = ConfigState::new();
3355 state
3356 .dispatch(&RequestType::AddHttpListener(make_http_listener(addr)).into())
3357 .unwrap();
3358
3359 let patch = UpdateHttpListenerConfig {
3360 address: addr,
3361 h2_max_window_update_stream0_per_window: Some(0),
3362 ..Default::default()
3363 };
3364 let err = state
3365 .dispatch(&RequestType::UpdateHttpListener(patch).into())
3366 .unwrap_err();
3367 assert!(
3368 matches!(
3369 err,
3370 StateError::InvalidValue {
3371 field: "h2_max_window_update_stream0_per_window",
3372 ..
3373 }
3374 ),
3375 "expected InvalidValue, got: {err}"
3376 );
3377 }
3378
3379 #[test]
3383 fn update_tcp_listener_happy_path() {
3384 let addr = SocketAddress::new_v4(0, 0, 0, 0, 9000);
3385 let mut state = ConfigState::new();
3386 state
3387 .dispatch(&RequestType::AddTcpListener(make_tcp_listener(addr)).into())
3388 .unwrap();
3389
3390 let patch = UpdateTcpListenerConfig {
3391 address: addr,
3392 front_timeout: Some(5),
3393 ..Default::default()
3394 };
3395 state
3396 .dispatch(&RequestType::UpdateTcpListener(patch).into())
3397 .expect("TCP update must succeed");
3398
3399 let listener = state.tcp_listeners.get(&SocketAddr::from(addr)).unwrap();
3400 assert_eq!(listener.front_timeout, 5);
3401 assert_eq!(listener.back_timeout, 30); }
3403
3404 #[test]
3406 fn update_tcp_listener_not_found() {
3407 let mut state = ConfigState::new();
3408 let patch = UpdateTcpListenerConfig {
3409 address: SocketAddress::new_v4(9, 9, 9, 9, 9999),
3410 front_timeout: Some(5),
3411 ..Default::default()
3412 };
3413 let err = state
3414 .dispatch(&RequestType::UpdateTcpListener(patch).into())
3415 .unwrap_err();
3416 assert!(
3417 matches!(
3418 err,
3419 StateError::NotFound {
3420 kind: ObjectKind::TcpListener,
3421 ..
3422 }
3423 ),
3424 "expected NotFound, got: {err}"
3425 );
3426 }
3427
3428 #[test]
3434 fn dispatch_passes_through_set_metric_detail() {
3435 use crate::proto::command::{MetricDetail, SetMetricDetail};
3436 let mut state = ConfigState::new();
3437 let req: Request = RequestType::SetMetricDetail(SetMetricDetail {
3438 client_id: "test:1".to_owned(),
3439 detail: Some(MetricDetail::DetailBackend as i32),
3440 ttl_seconds: Some(60),
3441 clear: Some(false),
3442 reason: Some("regression-guard".to_owned()),
3443 peer_pid: None,
3444 peer_session_ulid: None,
3445 })
3446 .into();
3447 state
3448 .dispatch(&req)
3449 .expect("SetMetricDetail must traverse dispatch without UndispatchableRequest");
3450 }
3451}