sozu_command_lib/
state.rs

1use std::{
2    collections::{
3        btree_map::Entry as BTreeMapEntry, hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap,
4        HashSet,
5    },
6    fs::File,
7    hash::{Hash, Hasher},
8    io::Write,
9    iter::repeat,
10    net::SocketAddr,
11};
12
13use prost::{Message, UnknownEnumValue};
14
15use crate::{
16    certificate::{calculate_fingerprint, CertificateError, Fingerprint},
17    proto::{
18        command::{
19            request::RequestType, ActivateListener, AddBackend, AddCertificate, CertificateAndKey,
20            Cluster, ClusterInformation, DeactivateListener, FrontendFilters, HttpListenerConfig,
21            HttpsListenerConfig, InitialState, ListedFrontends, ListenerType, ListenersList,
22            PathRule, QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener,
23            ReplaceCertificate, Request, RequestCounts, RequestHttpFrontend, RequestTcpFrontend,
24            SocketAddress, TcpListenerConfig, WorkerRequest,
25        },
26        display::format_request_type,
27    },
28    response::{Backend, HttpFrontend, TcpFrontend},
29    ObjectKind,
30};
31
32/// To use throughout Sōzu
33pub type ClusterId = String;
34
35#[derive(thiserror::Error, Debug)]
36pub enum StateError {
37    #[error("Request came in empty")]
38    EmptyRequest,
39    #[error("dispatching this request did not bring any change to the state")]
40    NoChange,
41    #[error("State can not handle this request")]
42    UndispatchableRequest,
43    #[error("Did not find {kind:?} with address or id '{id}'")]
44    NotFound { kind: ObjectKind, id: String },
45    #[error("{kind:?} '{id}' already exists")]
46    Exists { kind: ObjectKind, id: String },
47    #[error("Wrong field value: {0}")]
48    WrongFieldValue(UnknownEnumValue),
49    #[error("Could not add certificate: {0}")]
50    AddCertificate(CertificateError),
51    #[error("Could not remove certificate: {0}")]
52    RemoveCertificate(String),
53    #[error("Could not replace certificate: {0}")]
54    ReplaceCertificate(String),
55    #[error(
56        "Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
57    )]
58    FrontendConversion { frontend: String, error: String },
59    #[error("Could not write state to file: {0}")]
60    FileError(std::io::Error),
61}
62
63/// The `ConfigState` represents the state of Sōzu's business, which is to forward traffic
64/// from frontends to backends. Hence, it contains all details about:
65///
66/// - listeners (socket addresses, for TCP and HTTP connections)
67/// - frontends (bind to a listener)
68/// - backends (to forward connections to)
69/// - clusters (routing rules from frontends to backends)
70/// - TLS certificates
71#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct ConfigState {
73    pub clusters: BTreeMap<ClusterId, Cluster>,
74    pub backends: BTreeMap<ClusterId, Vec<Backend>>,
75    /// socket address -> HTTP listener
76    pub http_listeners: BTreeMap<SocketAddr, HttpListenerConfig>,
77    /// socket address -> HTTPS listener
78    pub https_listeners: BTreeMap<SocketAddr, HttpsListenerConfig>,
79    /// socket address -> TCP listener
80    pub tcp_listeners: BTreeMap<SocketAddr, TcpListenerConfig>,
81    /// HTTP frontends, indexed by a summary of each front's address;hostname;path, for uniqueness.
82    /// For example: `"0.0.0.0:8080;lolcatho.st;P/api"`
83    pub http_fronts: BTreeMap<String, HttpFrontend>,
84    /// indexed by (address, hostname, path)
85    pub https_fronts: BTreeMap<String, HttpFrontend>,
86    pub tcp_fronts: HashMap<ClusterId, Vec<TcpFrontend>>,
87    pub certificates: HashMap<SocketAddr, HashMap<Fingerprint, CertificateAndKey>>,
88    /// A census of requests that were received. Name of the request -> number of occurences
89    pub request_counts: BTreeMap<String, i32>,
90}
91
92impl ConfigState {
93    pub fn new() -> Self {
94        Self::default()
95    }
96
97    pub fn dispatch(&mut self, request: &Request) -> Result<(), StateError> {
98        let request_type = match &request.request_type {
99            Some(t) => t,
100            None => return Err(StateError::EmptyRequest),
101        };
102
103        self.increment_request_count(request);
104
105        match request_type {
106            RequestType::AddCluster(cluster) => self.add_cluster(cluster),
107            RequestType::RemoveCluster(cluster_id) => self.remove_cluster(cluster_id),
108            RequestType::AddHttpListener(listener) => self.add_http_listener(listener),
109            RequestType::AddHttpsListener(listener) => self.add_https_listener(listener),
110            RequestType::AddTcpListener(listener) => self.add_tcp_listener(listener),
111            RequestType::RemoveListener(remove) => self.remove_listener(remove),
112            RequestType::ActivateListener(activate) => self.activate_listener(activate),
113            RequestType::DeactivateListener(deactivate) => self.deactivate_listener(deactivate),
114            RequestType::AddHttpFrontend(front) => self.add_http_frontend(front),
115            RequestType::RemoveHttpFrontend(front) => self.remove_http_frontend(front),
116            RequestType::AddCertificate(add) => self.add_certificate(add),
117            RequestType::RemoveCertificate(remove) => self.remove_certificate(remove),
118            RequestType::ReplaceCertificate(replace) => self.replace_certificate(replace),
119            RequestType::AddHttpsFrontend(front) => self.add_https_frontend(front),
120            RequestType::RemoveHttpsFrontend(front) => self.remove_https_frontend(front),
121            RequestType::AddTcpFrontend(front) => self.add_tcp_frontend(front),
122            RequestType::RemoveTcpFrontend(front) => self.remove_tcp_frontend(front),
123            RequestType::AddBackend(add_backend) => self.add_backend(add_backend),
124            RequestType::RemoveBackend(backend) => self.remove_backend(backend),
125
126            // This is to avoid the error message
127            RequestType::Logging(_)
128            | RequestType::CountRequests(_)
129            | RequestType::Status(_)
130            | RequestType::SoftStop(_)
131            | RequestType::QueryCertificatesFromWorkers(_)
132            | RequestType::QueryClusterById(_)
133            | RequestType::QueryClustersByDomain(_)
134            | RequestType::QueryMetrics(_)
135            | RequestType::QueryClustersHashes(_)
136            | RequestType::ConfigureMetrics(_)
137            | RequestType::ReturnListenSockets(_)
138            | RequestType::HardStop(_) => Ok(()),
139
140            _other_request => Err(StateError::UndispatchableRequest),
141        }
142    }
143
144    /// Increments the count for this request type
145    fn increment_request_count(&mut self, request: &Request) {
146        if let Some(request_type) = &request.request_type {
147            let count = self
148                .request_counts
149                .entry(format_request_type(request_type).to_owned())
150                .or_insert(1);
151            *count += 1;
152        }
153    }
154
155    pub fn get_request_counts(&self) -> RequestCounts {
156        RequestCounts {
157            map: self.request_counts.clone(),
158        }
159    }
160
161    fn add_cluster(&mut self, cluster: &Cluster) -> Result<(), StateError> {
162        let cluster = cluster.clone();
163        self.clusters.insert(cluster.cluster_id.clone(), cluster);
164        Ok(())
165    }
166
167    fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), StateError> {
168        match self.clusters.remove(cluster_id) {
169            Some(_) => Ok(()),
170            None => Err(StateError::NotFound {
171                kind: ObjectKind::Cluster,
172                id: cluster_id.to_owned(),
173            }),
174        }
175    }
176
177    fn add_http_listener(&mut self, listener: &HttpListenerConfig) -> Result<(), StateError> {
178        let address: SocketAddr = listener.address.into();
179        match self.http_listeners.entry(address) {
180            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
181            BTreeMapEntry::Occupied(_) => {
182                return Err(StateError::Exists {
183                    kind: ObjectKind::HttpListener,
184                    id: address.to_string(),
185                })
186            }
187        };
188        Ok(())
189    }
190
191    fn add_https_listener(&mut self, listener: &HttpsListenerConfig) -> Result<(), StateError> {
192        let address: SocketAddr = listener.address.into();
193        match self.https_listeners.entry(address) {
194            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
195            BTreeMapEntry::Occupied(_) => {
196                return Err(StateError::Exists {
197                    kind: ObjectKind::HttpsListener,
198                    id: address.to_string(),
199                })
200            }
201        };
202        Ok(())
203    }
204
205    fn add_tcp_listener(&mut self, listener: &TcpListenerConfig) -> Result<(), StateError> {
206        let address: SocketAddr = listener.address.into();
207        match self.tcp_listeners.entry(address) {
208            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
209            BTreeMapEntry::Occupied(_) => {
210                return Err(StateError::Exists {
211                    kind: ObjectKind::TcpListener,
212                    id: address.to_string(),
213                })
214            }
215        };
216        Ok(())
217    }
218
219    fn remove_listener(&mut self, remove: &RemoveListener) -> Result<(), StateError> {
220        match ListenerType::try_from(remove.proxy).map_err(StateError::WrongFieldValue)? {
221            ListenerType::Http => self.remove_http_listener(&remove.address.into()),
222            ListenerType::Https => self.remove_https_listener(&remove.address.into()),
223            ListenerType::Tcp => self.remove_tcp_listener(&remove.address.into()),
224        }
225    }
226
227    fn remove_http_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
228        if self.http_listeners.remove(address).is_none() {
229            return Err(StateError::NoChange);
230        }
231        Ok(())
232    }
233
234    fn remove_https_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
235        if self.https_listeners.remove(address).is_none() {
236            return Err(StateError::NoChange);
237        }
238        Ok(())
239    }
240
241    fn remove_tcp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
242        if self.tcp_listeners.remove(address).is_none() {
243            return Err(StateError::NoChange);
244        }
245        Ok(())
246    }
247
248    fn activate_listener(&mut self, activate: &ActivateListener) -> Result<(), StateError> {
249        match ListenerType::try_from(activate.proxy).map_err(StateError::WrongFieldValue)? {
250            ListenerType::Http => self
251                .http_listeners
252                .get_mut(&activate.address.into())
253                .map(|listener| listener.active = true)
254                .ok_or(StateError::NotFound {
255                    kind: ObjectKind::HttpListener,
256                    id: activate.address.to_string(),
257                }),
258            ListenerType::Https => self
259                .https_listeners
260                .get_mut(&activate.address.into())
261                .map(|listener| listener.active = true)
262                .ok_or(StateError::NotFound {
263                    kind: ObjectKind::HttpsListener,
264                    id: activate.address.to_string(),
265                }),
266            ListenerType::Tcp => self
267                .tcp_listeners
268                .get_mut(&activate.address.into())
269                .map(|listener| listener.active = true)
270                .ok_or(StateError::NotFound {
271                    kind: ObjectKind::TcpListener,
272                    id: activate.address.to_string(),
273                }),
274        }
275    }
276
277    fn deactivate_listener(&mut self, deactivate: &DeactivateListener) -> Result<(), StateError> {
278        match ListenerType::try_from(deactivate.proxy).map_err(StateError::WrongFieldValue)? {
279            ListenerType::Http => self
280                .http_listeners
281                .get_mut(&deactivate.address.into())
282                .map(|listener| listener.active = false)
283                .ok_or(StateError::NotFound {
284                    kind: ObjectKind::HttpListener,
285                    id: deactivate.address.to_string(),
286                }),
287            ListenerType::Https => self
288                .https_listeners
289                .get_mut(&deactivate.address.into())
290                .map(|listener| listener.active = false)
291                .ok_or(StateError::NotFound {
292                    kind: ObjectKind::HttpsListener,
293                    id: deactivate.address.to_string(),
294                }),
295            ListenerType::Tcp => self
296                .tcp_listeners
297                .get_mut(&deactivate.address.into())
298                .map(|listener| listener.active = false)
299                .ok_or(StateError::NotFound {
300                    kind: ObjectKind::TcpListener,
301                    id: deactivate.address.to_string(),
302                }),
303        }
304    }
305
306    fn add_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
307        let front_as_key = front.to_string();
308
309        match self.http_fronts.entry(front.to_string()) {
310            BTreeMapEntry::Vacant(e) => {
311                e.insert(front.clone().to_frontend().map_err(|into_error| {
312                    StateError::FrontendConversion {
313                        frontend: front_as_key,
314                        error: into_error.to_string(),
315                    }
316                })?)
317            }
318            BTreeMapEntry::Occupied(_) => {
319                return Err(StateError::Exists {
320                    kind: ObjectKind::HttpFrontend,
321                    id: front.to_string(),
322                })
323            }
324        };
325        Ok(())
326    }
327
328    fn add_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
329        let front_as_key = front.to_string();
330
331        match self.https_fronts.entry(front.to_string()) {
332            BTreeMapEntry::Vacant(e) => {
333                e.insert(front.clone().to_frontend().map_err(|into_error| {
334                    StateError::FrontendConversion {
335                        frontend: front_as_key,
336                        error: into_error.to_string(),
337                    }
338                })?)
339            }
340            BTreeMapEntry::Occupied(_) => {
341                return Err(StateError::Exists {
342                    kind: ObjectKind::HttpsFrontend,
343                    id: front.to_string(),
344                })
345            }
346        };
347        Ok(())
348    }
349
350    fn remove_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
351        self.http_fronts
352            .remove(&front.to_string())
353            .ok_or(StateError::NotFound {
354                kind: ObjectKind::HttpFrontend,
355                id: front.to_string(),
356            })?;
357        Ok(())
358    }
359
360    fn remove_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
361        self.https_fronts
362            .remove(&front.to_string())
363            .ok_or(StateError::NotFound {
364                kind: ObjectKind::HttpsFrontend,
365                id: front.to_string(),
366            })?;
367        Ok(())
368    }
369
370    fn add_certificate(&mut self, add: &AddCertificate) -> Result<(), StateError> {
371        let fingerprint = add
372            .certificate
373            .fingerprint()
374            .map_err(StateError::AddCertificate)?;
375
376        let entry = self.certificates.entry(add.address.into()).or_default();
377
378        let mut add = add.clone();
379        add.certificate
380            .apply_overriding_names()
381            .map_err(StateError::AddCertificate)?;
382
383        if entry.contains_key(&fingerprint) {
384            info!(
385                "Skip loading of certificate '{}' for domain '{}' on listener '{}', the certificate is already present.",
386                fingerprint, add.certificate.names.join(", "), add.address
387            );
388            return Ok(());
389        }
390
391        entry.insert(fingerprint, add.certificate);
392        Ok(())
393    }
394
395    fn remove_certificate(&mut self, remove: &RemoveCertificate) -> Result<(), StateError> {
396        let fingerprint = Fingerprint(
397            hex::decode(&remove.fingerprint)
398                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
399        );
400
401        if let Some(index) = self.certificates.get_mut(&remove.address.into()) {
402            index.remove(&fingerprint);
403        }
404
405        Ok(())
406    }
407
408    /// - Remove old certificate from certificates, using the old fingerprint
409    /// - calculate the new fingerprint
410    /// - insert the new certificate with the new fingerprint as key
411    /// - check that the new entry is present in the certificates hashmap
412    fn replace_certificate(&mut self, replace: &ReplaceCertificate) -> Result<(), StateError> {
413        let replace_address = replace.address.into();
414        let old_fingerprint = Fingerprint(
415            hex::decode(&replace.old_fingerprint)
416                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
417        );
418
419        self.certificates
420            .get_mut(&replace_address)
421            .ok_or(StateError::NotFound {
422                kind: ObjectKind::Certificate,
423                id: replace.address.to_string(),
424            })?
425            .remove(&old_fingerprint);
426
427        let new_fingerprint = Fingerprint(
428            calculate_fingerprint(replace.new_certificate.certificate.as_bytes()).map_err(
429                |fingerprint_err| StateError::ReplaceCertificate(fingerprint_err.to_string()),
430            )?,
431        );
432
433        self.certificates
434            .get_mut(&replace_address)
435            .map(|certs| certs.insert(new_fingerprint.clone(), replace.new_certificate.clone()));
436
437        if !self
438            .certificates
439            .get(&replace_address)
440            .ok_or(StateError::ReplaceCertificate(
441                "Unlikely error. This entry in the certificate hashmap should be present"
442                    .to_string(),
443            ))?
444            .contains_key(&new_fingerprint)
445        {
446            return Err(StateError::ReplaceCertificate(format!(
447                "Failed to insert the new certificate for address {}",
448                replace.address
449            )));
450        }
451        Ok(())
452    }
453
454    fn add_tcp_frontend(&mut self, front: &RequestTcpFrontend) -> Result<(), StateError> {
455        let tcp_frontends = self.tcp_fronts.entry(front.cluster_id.clone()).or_default();
456
457        let tcp_frontend = TcpFrontend {
458            cluster_id: front.cluster_id.clone(),
459            address: front.address.into(),
460            tags: front.tags.clone(),
461        };
462        if tcp_frontends.contains(&tcp_frontend) {
463            return Err(StateError::Exists {
464                kind: ObjectKind::TcpFrontend,
465                id: format!("{:?}", tcp_frontend),
466            });
467        }
468
469        tcp_frontends.push(tcp_frontend);
470        Ok(())
471    }
472
473    fn remove_tcp_frontend(
474        &mut self,
475        front_to_remove: &RequestTcpFrontend,
476    ) -> Result<(), StateError> {
477        let tcp_frontends =
478            self.tcp_fronts
479                .get_mut(&front_to_remove.cluster_id)
480                .ok_or(StateError::NotFound {
481                    kind: ObjectKind::TcpFrontend,
482                    id: format!("{:?}", front_to_remove),
483                })?;
484
485        let len = tcp_frontends.len();
486        tcp_frontends.retain(|front| front.address != front_to_remove.address.into());
487        if tcp_frontends.len() == len {
488            return Err(StateError::NoChange);
489        }
490        Ok(())
491    }
492
493    fn add_backend(&mut self, add_backend: &AddBackend) -> Result<(), StateError> {
494        let backend = Backend {
495            address: add_backend.address.into(),
496            cluster_id: add_backend.cluster_id.clone(),
497            backend_id: add_backend.backend_id.clone(),
498            sticky_id: add_backend.sticky_id.clone(),
499            load_balancing_parameters: add_backend.load_balancing_parameters,
500            backup: add_backend.backup,
501        };
502        let backends = self.backends.entry(backend.cluster_id.clone()).or_default();
503
504        // we might be modifying the sticky id or load balancing parameters
505        backends.retain(|b| b.backend_id != backend.backend_id || b.address != backend.address);
506        backends.push(backend);
507        backends.sort();
508
509        Ok(())
510    }
511
512    fn remove_backend(&mut self, backend: &RemoveBackend) -> Result<(), StateError> {
513        let backend_list =
514            self.backends
515                .get_mut(&backend.cluster_id)
516                .ok_or(StateError::NotFound {
517                    kind: ObjectKind::Backend,
518                    id: backend.backend_id.to_owned(),
519                })?;
520
521        let len = backend_list.len();
522        let remove_address = backend.address.into();
523        backend_list.retain(|b| b.backend_id != backend.backend_id || b.address != remove_address);
524        backend_list.sort();
525        if backend_list.len() == len {
526            return Err(StateError::NoChange);
527        }
528        Ok(())
529    }
530
531    /// creates all requests needed to bootstrap the state
532    fn generate_requests(&self) -> Vec<Request> {
533        let mut v: Vec<Request> = Vec::new();
534
535        for listener in self.http_listeners.values() {
536            v.push(RequestType::AddHttpListener(listener.clone()).into());
537            if listener.active {
538                v.push(
539                    RequestType::ActivateListener(ActivateListener {
540                        address: listener.address,
541                        proxy: ListenerType::Http.into(),
542                        from_scm: false,
543                    })
544                    .into(),
545                );
546            }
547        }
548
549        for listener in self.https_listeners.values() {
550            v.push(RequestType::AddHttpsListener(listener.clone()).into());
551            if listener.active {
552                v.push(
553                    RequestType::ActivateListener(ActivateListener {
554                        address: listener.address,
555                        proxy: ListenerType::Https.into(),
556                        from_scm: false,
557                    })
558                    .into(),
559                );
560            }
561        }
562
563        for listener in self.tcp_listeners.values() {
564            v.push(RequestType::AddTcpListener(*listener).into());
565            if listener.active {
566                v.push(
567                    RequestType::ActivateListener(ActivateListener {
568                        address: listener.address,
569                        proxy: ListenerType::Tcp.into(),
570                        from_scm: false,
571                    })
572                    .into(),
573                );
574            }
575        }
576
577        for cluster in self.clusters.values() {
578            v.push(RequestType::AddCluster(cluster.clone()).into());
579        }
580
581        for front in self.http_fronts.values() {
582            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
583        }
584
585        for (front, certs) in self.certificates.iter() {
586            for certificate_and_key in certs.values() {
587                v.push(
588                    RequestType::AddCertificate(AddCertificate {
589                        address: SocketAddress::from(*front),
590                        certificate: certificate_and_key.clone(),
591                        expired_at: None,
592                    })
593                    .into(),
594                );
595            }
596        }
597
598        for front in self.https_fronts.values() {
599            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
600        }
601
602        for front_list in self.tcp_fronts.values() {
603            for front in front_list {
604                v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
605            }
606        }
607
608        for backend_list in self.backends.values() {
609            for backend in backend_list {
610                v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
611            }
612        }
613
614        v
615    }
616
617    pub fn generate_activate_requests(&self) -> Vec<Request> {
618        let mut v: Vec<Request> = Vec::new();
619        for front in self
620            .http_listeners
621            .iter()
622            .filter(|(_, listener)| listener.active)
623            .map(|(k, _)| k)
624        {
625            v.push(
626                RequestType::ActivateListener(ActivateListener {
627                    address: SocketAddress::from(*front),
628                    proxy: ListenerType::Http.into(),
629                    from_scm: false,
630                })
631                .into(),
632            );
633        }
634
635        for front in self
636            .https_listeners
637            .iter()
638            .filter(|(_, listener)| listener.active)
639            .map(|(k, _)| k)
640        {
641            v.push(
642                RequestType::ActivateListener(ActivateListener {
643                    address: SocketAddress::from(*front),
644                    proxy: ListenerType::Https.into(),
645                    from_scm: false,
646                })
647                .into(),
648            );
649        }
650        for front in self
651            .tcp_listeners
652            .iter()
653            .filter(|(_, listener)| listener.active)
654            .map(|(k, _)| k)
655        {
656            v.push(
657                RequestType::ActivateListener(ActivateListener {
658                    address: SocketAddress::from(*front),
659                    proxy: ListenerType::Tcp.into(),
660                    from_scm: false,
661                })
662                .into(),
663            );
664        }
665
666        v
667    }
668
669    pub fn diff(&self, other: &ConfigState) -> Vec<Request> {
670        //pub tcp_listeners:   HashMap<SocketAddr, (TcpListener, bool)>,
671        let my_tcp_listeners: HashSet<&SocketAddr> = self.tcp_listeners.keys().collect();
672        let their_tcp_listeners: HashSet<&SocketAddr> = other.tcp_listeners.keys().collect();
673        let removed_tcp_listeners = my_tcp_listeners.difference(&their_tcp_listeners);
674        let added_tcp_listeners = their_tcp_listeners.difference(&my_tcp_listeners);
675
676        let my_http_listeners: HashSet<&SocketAddr> = self.http_listeners.keys().collect();
677        let their_http_listeners: HashSet<&SocketAddr> = other.http_listeners.keys().collect();
678        let removed_http_listeners = my_http_listeners.difference(&their_http_listeners);
679        let added_http_listeners = their_http_listeners.difference(&my_http_listeners);
680
681        let my_https_listeners: HashSet<&SocketAddr> = self.https_listeners.keys().collect();
682        let their_https_listeners: HashSet<&SocketAddr> = other.https_listeners.keys().collect();
683        let removed_https_listeners = my_https_listeners.difference(&their_https_listeners);
684        let added_https_listeners = their_https_listeners.difference(&my_https_listeners);
685
686        let mut v: Vec<Request> = vec![];
687
688        for address in removed_tcp_listeners {
689            if self.tcp_listeners[*address].active {
690                v.push(
691                    RequestType::DeactivateListener(DeactivateListener {
692                        address: SocketAddress::from(**address),
693                        proxy: ListenerType::Tcp.into(),
694                        to_scm: false,
695                    })
696                    .into(),
697                );
698            }
699
700            v.push(
701                RequestType::RemoveListener(RemoveListener {
702                    address: SocketAddress::from(**address),
703                    proxy: ListenerType::Tcp.into(),
704                })
705                .into(),
706            );
707        }
708
709        for address in added_tcp_listeners.clone() {
710            v.push(RequestType::AddTcpListener(other.tcp_listeners[*address]).into());
711
712            if other.tcp_listeners[*address].active {
713                v.push(
714                    RequestType::ActivateListener(ActivateListener {
715                        address: SocketAddress::from(**address),
716                        proxy: ListenerType::Tcp.into(),
717                        from_scm: false,
718                    })
719                    .into(),
720                );
721            }
722        }
723
724        for address in removed_http_listeners {
725            if self.http_listeners[*address].active {
726                v.push(
727                    RequestType::DeactivateListener(DeactivateListener {
728                        address: SocketAddress::from(**address),
729                        proxy: ListenerType::Http.into(),
730                        to_scm: false,
731                    })
732                    .into(),
733                );
734            }
735
736            v.push(
737                RequestType::RemoveListener(RemoveListener {
738                    address: SocketAddress::from(**address),
739                    proxy: ListenerType::Http.into(),
740                })
741                .into(),
742            );
743        }
744
745        for address in added_http_listeners.clone() {
746            v.push(RequestType::AddHttpListener(other.http_listeners[*address].clone()).into());
747
748            if other.http_listeners[*address].active {
749                v.push(
750                    RequestType::ActivateListener(ActivateListener {
751                        address: SocketAddress::from(**address),
752                        proxy: ListenerType::Http.into(),
753                        from_scm: false,
754                    })
755                    .into(),
756                );
757            }
758        }
759
760        for address in removed_https_listeners {
761            if self.https_listeners[*address].active {
762                v.push(
763                    RequestType::DeactivateListener(DeactivateListener {
764                        address: SocketAddress::from(**address),
765                        proxy: ListenerType::Https.into(),
766                        to_scm: false,
767                    })
768                    .into(),
769                );
770            }
771
772            v.push(
773                RequestType::RemoveListener(RemoveListener {
774                    address: SocketAddress::from(**address),
775                    proxy: ListenerType::Https.into(),
776                })
777                .into(),
778            );
779        }
780
781        for address in added_https_listeners.clone() {
782            v.push(RequestType::AddHttpsListener(other.https_listeners[*address].clone()).into());
783
784            if other.https_listeners[*address].active {
785                v.push(
786                    RequestType::ActivateListener(ActivateListener {
787                        address: SocketAddress::from(**address),
788                        proxy: ListenerType::Https.into(),
789                        from_scm: false,
790                    })
791                    .into(),
792                );
793            }
794        }
795
796        for addr in my_tcp_listeners.intersection(&their_tcp_listeners) {
797            let my_listener = &self.tcp_listeners[*addr];
798            let their_listener = &other.tcp_listeners[*addr];
799
800            if my_listener != their_listener {
801                v.push(
802                    RequestType::RemoveListener(RemoveListener {
803                        address: SocketAddress::from(**addr),
804                        proxy: ListenerType::Tcp.into(),
805                    })
806                    .into(),
807                );
808                // any added listener should be unactive
809                let mut listener_to_add = *their_listener;
810                listener_to_add.active = false;
811                v.push(RequestType::AddTcpListener(listener_to_add).into());
812            }
813
814            if my_listener.active && !their_listener.active {
815                v.push(
816                    RequestType::DeactivateListener(DeactivateListener {
817                        address: SocketAddress::from(**addr),
818                        proxy: ListenerType::Tcp.into(),
819                        to_scm: false,
820                    })
821                    .into(),
822                );
823            }
824
825            if !my_listener.active && their_listener.active {
826                v.push(
827                    RequestType::ActivateListener(ActivateListener {
828                        address: SocketAddress::from(**addr),
829                        proxy: ListenerType::Tcp.into(),
830                        from_scm: false,
831                    })
832                    .into(),
833                );
834            }
835        }
836
837        for addr in my_http_listeners.intersection(&their_http_listeners) {
838            let my_listener = &self.http_listeners[*addr];
839            let their_listener = &other.http_listeners[*addr];
840
841            if my_listener != their_listener {
842                v.push(
843                    RequestType::RemoveListener(RemoveListener {
844                        address: SocketAddress::from(**addr),
845                        proxy: ListenerType::Http.into(),
846                    })
847                    .into(),
848                );
849                // any added listener should be unactive
850                let mut listener_to_add = their_listener.clone();
851                listener_to_add.active = false;
852                v.push(RequestType::AddHttpListener(listener_to_add).into());
853            }
854
855            if my_listener.active && !their_listener.active {
856                v.push(
857                    RequestType::DeactivateListener(DeactivateListener {
858                        address: SocketAddress::from(**addr),
859                        proxy: ListenerType::Http.into(),
860                        to_scm: false,
861                    })
862                    .into(),
863                );
864            }
865
866            if !my_listener.active && their_listener.active {
867                v.push(
868                    RequestType::ActivateListener(ActivateListener {
869                        address: SocketAddress::from(**addr),
870                        proxy: ListenerType::Http.into(),
871                        from_scm: false,
872                    })
873                    .into(),
874                );
875            }
876        }
877
878        for addr in my_https_listeners.intersection(&their_https_listeners) {
879            let my_listener = &self.https_listeners[*addr];
880            let their_listener = &other.https_listeners[*addr];
881
882            if my_listener != their_listener {
883                v.push(
884                    RequestType::RemoveListener(RemoveListener {
885                        address: SocketAddress::from(**addr),
886                        proxy: ListenerType::Https.into(),
887                    })
888                    .into(),
889                );
890                // any added listener should be unactive
891                let mut listener_to_add = their_listener.clone();
892                listener_to_add.active = false;
893                v.push(RequestType::AddHttpsListener(listener_to_add).into());
894            }
895
896            if my_listener.active && !their_listener.active {
897                v.push(
898                    RequestType::DeactivateListener(DeactivateListener {
899                        address: SocketAddress::from(**addr),
900                        proxy: ListenerType::Https.into(),
901                        to_scm: false,
902                    })
903                    .into(),
904                );
905            }
906
907            if !my_listener.active && their_listener.active {
908                v.push(
909                    RequestType::ActivateListener(ActivateListener {
910                        address: SocketAddress::from(**addr),
911                        proxy: ListenerType::Https.into(),
912                        from_scm: false,
913                    })
914                    .into(),
915                );
916            }
917        }
918
919        for (cluster_id, res) in diff_map(self.clusters.iter(), other.clusters.iter()) {
920            match res {
921                DiffResult::Added | DiffResult::Changed => v.push(
922                    RequestType::AddCluster(other.clusters.get(cluster_id).unwrap().clone()).into(),
923                ),
924                DiffResult::Removed => {
925                    v.push(RequestType::RemoveCluster(cluster_id.to_string()).into())
926                }
927            }
928        }
929
930        for ((cluster_id, backend_id), res) in diff_map(
931            self.backends.iter().flat_map(|(cluster_id, v)| {
932                v.iter()
933                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
934            }),
935            other.backends.iter().flat_map(|(cluster_id, v)| {
936                v.iter()
937                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
938            }),
939        ) {
940            match res {
941                DiffResult::Added => {
942                    let backend = other
943                        .backends
944                        .get(cluster_id)
945                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
946                        .unwrap();
947                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
948                }
949                DiffResult::Removed => {
950                    let backend = self
951                        .backends
952                        .get(cluster_id)
953                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
954                        .unwrap();
955
956                    v.push(
957                        RequestType::RemoveBackend(RemoveBackend {
958                            cluster_id: backend.cluster_id.clone(),
959                            backend_id: backend.backend_id.clone(),
960                            address: SocketAddress::from(backend.address),
961                        })
962                        .into(),
963                    );
964                }
965                DiffResult::Changed => {
966                    let backend = self
967                        .backends
968                        .get(cluster_id)
969                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
970                        .unwrap();
971
972                    v.push(
973                        RequestType::RemoveBackend(RemoveBackend {
974                            cluster_id: backend.cluster_id.clone(),
975                            backend_id: backend.backend_id.clone(),
976                            address: SocketAddress::from(backend.address),
977                        })
978                        .into(),
979                    );
980
981                    let backend = other
982                        .backends
983                        .get(cluster_id)
984                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
985                        .unwrap();
986                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
987                }
988            }
989        }
990
991        let mut my_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
992        for (route, front) in self.http_fronts.iter() {
993            my_http_fronts.insert((route, front));
994        }
995        let mut their_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
996        for (route, front) in other.http_fronts.iter() {
997            their_http_fronts.insert((route, front));
998        }
999
1000        let removed_http_fronts = my_http_fronts.difference(&their_http_fronts);
1001        let added_http_fronts = their_http_fronts.difference(&my_http_fronts);
1002
1003        for &(_, front) in removed_http_fronts {
1004            v.push(RequestType::RemoveHttpFrontend(front.clone().into()).into());
1005        }
1006
1007        for &(_, front) in added_http_fronts {
1008            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
1009        }
1010
1011        let mut my_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1012        for (route, front) in self.https_fronts.iter() {
1013            my_https_fronts.insert((route, front));
1014        }
1015        let mut their_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1016        for (route, front) in other.https_fronts.iter() {
1017            their_https_fronts.insert((route, front));
1018        }
1019        let removed_https_fronts = my_https_fronts.difference(&their_https_fronts);
1020        let added_https_fronts = their_https_fronts.difference(&my_https_fronts);
1021
1022        for &(_, front) in removed_https_fronts {
1023            v.push(RequestType::RemoveHttpsFrontend(front.clone().into()).into());
1024        }
1025
1026        for &(_, front) in added_https_fronts {
1027            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
1028        }
1029
1030        let mut my_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1031        for (cluster_id, front_list) in self.tcp_fronts.iter() {
1032            for front in front_list.iter() {
1033                my_tcp_fronts.insert((cluster_id, front));
1034            }
1035        }
1036        let mut their_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1037        for (cluster_id, front_list) in other.tcp_fronts.iter() {
1038            for front in front_list.iter() {
1039                their_tcp_fronts.insert((cluster_id, front));
1040            }
1041        }
1042
1043        let removed_tcp_fronts = my_tcp_fronts.difference(&their_tcp_fronts);
1044        let added_tcp_fronts = their_tcp_fronts.difference(&my_tcp_fronts);
1045
1046        for &(_, front) in removed_tcp_fronts {
1047            v.push(RequestType::RemoveTcpFrontend(front.clone().into()).into());
1048        }
1049
1050        for &(_, front) in added_tcp_fronts {
1051            v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
1052        }
1053
1054        //pub certificates:    HashMap<SocketAddr, HashMap<CertificateFingerprint, (CertificateAndKey, Vec<String>)>>,
1055        let my_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1056            self.certificates
1057                .iter()
1058                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1059        );
1060        let their_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1061            other
1062                .certificates
1063                .iter()
1064                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1065        );
1066
1067        let removed_certificates = my_certificates.difference(&their_certificates);
1068        let added_certificates = their_certificates.difference(&my_certificates);
1069
1070        for &(address, fingerprint) in removed_certificates {
1071            v.push(
1072                RequestType::RemoveCertificate(RemoveCertificate {
1073                    address: SocketAddress::from(address),
1074                    fingerprint: fingerprint.to_string(),
1075                })
1076                .into(),
1077            );
1078        }
1079
1080        for &(address, fingerprint) in added_certificates {
1081            if let Some(certificate_and_key) = other
1082                .certificates
1083                .get(&address)
1084                .and_then(|certs| certs.get(fingerprint))
1085            {
1086                v.push(
1087                    RequestType::AddCertificate(AddCertificate {
1088                        address: SocketAddress::from(address),
1089                        certificate: certificate_and_key.clone(),
1090                        expired_at: None,
1091                    })
1092                    .into(),
1093                );
1094            }
1095        }
1096
1097        for address in added_tcp_listeners {
1098            let listener = &other.tcp_listeners[*address];
1099            if listener.active {
1100                v.push(
1101                    RequestType::ActivateListener(ActivateListener {
1102                        address: listener.address,
1103                        proxy: ListenerType::Tcp.into(),
1104                        from_scm: false,
1105                    })
1106                    .into(),
1107                );
1108            }
1109        }
1110
1111        v
1112    }
1113
1114    // FIXME: what about deny rules?
1115    pub fn hash_state(&self) -> BTreeMap<ClusterId, u64> {
1116        let mut hm: HashMap<ClusterId, DefaultHasher> = self
1117            .clusters
1118            .keys()
1119            .map(|cluster_id| {
1120                let mut hasher = DefaultHasher::new();
1121                self.clusters.get(cluster_id).hash(&mut hasher);
1122                if let Some(backends) = self.backends.get(cluster_id) {
1123                    backends.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1124                }
1125                if let Some(tcp_fronts) = self.tcp_fronts.get(cluster_id) {
1126                    tcp_fronts.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1127                }
1128                (cluster_id.to_owned(), hasher)
1129            })
1130            .collect();
1131
1132        for front in self.http_fronts.values() {
1133            if let Some(cluster_id) = &front.cluster_id {
1134                if let Some(hasher) = hm.get_mut(cluster_id) {
1135                    front.hash(hasher);
1136                }
1137            }
1138        }
1139
1140        for front in self.https_fronts.values() {
1141            if let Some(cluster_id) = &front.cluster_id {
1142                if let Some(hasher) = hm.get_mut(cluster_id) {
1143                    front.hash(hasher);
1144                }
1145            }
1146        }
1147
1148        hm.drain()
1149            .map(|(cluster_id, hasher)| (cluster_id, hasher.finish()))
1150            .collect()
1151    }
1152
1153    /// Gives details about a given cluster.
1154    /// Types like `HttpFrontend` are converted into protobuf ones, like `RequestHttpFrontend`
1155    pub fn cluster_state(&self, cluster_id: &str) -> Option<ClusterInformation> {
1156        let configuration = self.clusters.get(cluster_id).cloned()?;
1157        info!("{:?}", configuration);
1158
1159        let http_frontends: Vec<RequestHttpFrontend> = self
1160            .http_fronts
1161            .values()
1162            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1163            .map(|front| front.clone().into())
1164            .collect();
1165
1166        let https_frontends: Vec<RequestHttpFrontend> = self
1167            .https_fronts
1168            .values()
1169            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1170            .map(|front| front.clone().into())
1171            .collect();
1172
1173        let tcp_frontends: Vec<RequestTcpFrontend> = self
1174            .tcp_fronts
1175            .get(cluster_id)
1176            .cloned()
1177            .unwrap_or_default()
1178            .iter()
1179            .map(|front| front.clone().into())
1180            .collect();
1181
1182        let backends: Vec<AddBackend> = self
1183            .backends
1184            .get(cluster_id)
1185            .cloned()
1186            .unwrap_or_default()
1187            .iter()
1188            .map(|backend| backend.clone().into())
1189            .collect();
1190
1191        Some(ClusterInformation {
1192            configuration: Some(configuration),
1193            http_frontends,
1194            https_frontends,
1195            tcp_frontends,
1196            backends,
1197        })
1198    }
1199
1200    pub fn count_backends(&self) -> usize {
1201        self.backends.values().fold(0, |acc, v| acc + v.len())
1202    }
1203
1204    pub fn count_frontends(&self) -> usize {
1205        self.http_fronts.values().count()
1206            + self.https_fronts.values().count()
1207            + self.tcp_fronts.values().fold(0, |acc, v| acc + v.len())
1208    }
1209
1210    pub fn get_cluster_ids_by_domain(
1211        &self,
1212        hostname: String,
1213        path: Option<String>,
1214    ) -> HashSet<ClusterId> {
1215        let mut cluster_ids: HashSet<ClusterId> = HashSet::new();
1216
1217        self.http_fronts.values().for_each(|front| {
1218            if domain_check(&front.hostname, &front.path, &hostname, &path) {
1219                if let Some(id) = &front.cluster_id {
1220                    cluster_ids.insert(id.to_string());
1221                }
1222            }
1223        });
1224
1225        self.https_fronts.values().for_each(|front| {
1226            if domain_check(&front.hostname, &front.path, &hostname, &path) {
1227                if let Some(id) = &front.cluster_id {
1228                    cluster_ids.insert(id.to_string());
1229                }
1230            }
1231        });
1232
1233        cluster_ids
1234    }
1235
1236    pub fn get_certificates(
1237        &self,
1238        filters: QueryCertificatesFilters,
1239    ) -> BTreeMap<String, CertificateAndKey> {
1240        self.certificates
1241            .values()
1242            .flat_map(|hash_map| hash_map.iter())
1243            .filter(|(fingerprint, cert)| {
1244                if let Some(domain) = &filters.domain {
1245                    cert.names.contains(domain)
1246                } else if let Some(f) = &filters.fingerprint {
1247                    fingerprint.to_string() == *f
1248                } else {
1249                    true
1250                }
1251            })
1252            .map(|(fingerprint, cert)| (fingerprint.to_string(), cert.to_owned()))
1253            .collect()
1254    }
1255
1256    pub fn list_frontends(&self, filters: FrontendFilters) -> ListedFrontends {
1257        // if no http / https / tcp filter is provided, list all of them
1258        let list_all = !filters.http && !filters.https && !filters.tcp;
1259
1260        let mut listed_frontends = ListedFrontends::default();
1261
1262        if filters.http || list_all {
1263            for http_frontend in self.http_fronts.iter().filter(|f| {
1264                if let Some(domain) = &filters.domain {
1265                    f.1.hostname.contains(domain)
1266                } else {
1267                    true
1268                }
1269            }) {
1270                listed_frontends
1271                    .http_frontends
1272                    .push(http_frontend.1.to_owned().into());
1273            }
1274        }
1275
1276        if filters.https || list_all {
1277            for https_frontend in self.https_fronts.iter().filter(|f| {
1278                if let Some(domain) = &filters.domain {
1279                    f.1.hostname.contains(domain)
1280                } else {
1281                    true
1282                }
1283            }) {
1284                listed_frontends
1285                    .https_frontends
1286                    .push(https_frontend.1.to_owned().into());
1287            }
1288        }
1289
1290        if (filters.tcp || list_all) && filters.domain.is_none() {
1291            for tcp_frontend in self.tcp_fronts.values().flat_map(|v| v.iter()) {
1292                listed_frontends
1293                    .tcp_frontends
1294                    .push(tcp_frontend.to_owned().into())
1295            }
1296        }
1297
1298        listed_frontends
1299    }
1300
1301    pub fn list_listeners(&self) -> ListenersList {
1302        ListenersList {
1303            http_listeners: self
1304                .http_listeners
1305                .iter()
1306                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1307                .collect(),
1308            https_listeners: self
1309                .https_listeners
1310                .iter()
1311                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1312                .collect(),
1313            tcp_listeners: self
1314                .tcp_listeners
1315                .iter()
1316                .map(|(addr, listener)| (addr.to_string(), *listener))
1317                .collect(),
1318        }
1319    }
1320
1321    // create requests needed for a worker to recreate the state
1322    pub fn produce_initial_state(&self) -> InitialState {
1323        let mut worker_requests = Vec::new();
1324        for (counter, request) in self.generate_requests().into_iter().enumerate() {
1325            worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
1326        }
1327        InitialState {
1328            requests: worker_requests,
1329        }
1330    }
1331
1332    /// generate requests necessary to recreate the state,
1333    /// in protobuf, to a temp file
1334    pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1335        let initial_state = self.produce_initial_state();
1336        let count = initial_state.requests.len();
1337
1338        let bytes_to_write = initial_state.encode_to_vec();
1339        println!("writing {} in the temp file", bytes_to_write.len());
1340        file.write_all(&bytes_to_write)
1341            .map_err(StateError::FileError)?;
1342
1343        file.sync_all().map_err(StateError::FileError)?;
1344
1345        Ok(count)
1346    }
1347
1348    /// generate requests necessary to recreate the state,
1349    /// write them in a JSON form in a file, separated by \n\0,
1350    /// returns the number of written requests
1351    pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1352        let mut counter = 0usize;
1353        let requests = self.generate_requests();
1354
1355        for request in requests {
1356            let message = WorkerRequest::new(format!("SAVE-{counter}"), request);
1357
1358            file.write_all(
1359                &serde_json::to_string(&message)
1360                    .map(|s| s.into_bytes())
1361                    .unwrap_or_default(),
1362            )
1363            .map_err(StateError::FileError)?;
1364
1365            file.write_all(&b"\n\0"[..])
1366                .map_err(StateError::FileError)?;
1367
1368            if counter % 1000 == 0 {
1369                info!("writing {} commands to file", counter);
1370                file.sync_all().map_err(StateError::FileError)?;
1371            }
1372            counter += 1;
1373        }
1374        file.sync_all().map_err(StateError::FileError)?;
1375
1376        Ok(counter)
1377    }
1378}
1379
1380fn domain_check(
1381    front_hostname: &str,
1382    front_path_rule: &PathRule,
1383    hostname: &str,
1384    path_prefix: &Option<String>,
1385) -> bool {
1386    if hostname != front_hostname {
1387        return false;
1388    }
1389
1390    if let Some(ref path) = &path_prefix {
1391        return path == &front_path_rule.value;
1392    }
1393
1394    true
1395}
1396
1397struct DiffMap<'a, K: Ord, V, I1, I2> {
1398    my_it: I1,
1399    other_it: I2,
1400    my: Option<(K, &'a V)>,
1401    other: Option<(K, &'a V)>,
1402}
1403
1404//fn diff_map<'a, K:Ord, V: PartialEq>(my: &'a BTreeMap<K,V>, other: &'a BTreeMap<K,V>) -> DiffMap<'a,K,V> {
1405fn diff_map<
1406    'a,
1407    K: Ord,
1408    V: PartialEq,
1409    I1: Iterator<Item = (K, &'a V)>,
1410    I2: Iterator<Item = (K, &'a V)>,
1411>(
1412    my: I1,
1413    other: I2,
1414) -> DiffMap<'a, K, V, I1, I2> {
1415    DiffMap {
1416        my_it: my,
1417        other_it: other,
1418        my: None,
1419        other: None,
1420    }
1421}
1422
1423enum DiffResult {
1424    Added,
1425    Removed,
1426    Changed,
1427}
1428
1429// this will iterate over the keys of both iterators
1430// since keys are sorted, it should be easy to see which ones are in common or not
1431impl<
1432        'a,
1433        K: Ord,
1434        V: PartialEq,
1435        I1: Iterator<Item = (K, &'a V)>,
1436        I2: Iterator<Item = (K, &'a V)>,
1437    > std::iter::Iterator for DiffMap<'a, K, V, I1, I2>
1438{
1439    type Item = (K, DiffResult);
1440
1441    fn next(&mut self) -> Option<Self::Item> {
1442        loop {
1443            if self.my.is_none() {
1444                self.my = self.my_it.next();
1445            }
1446            if self.other.is_none() {
1447                self.other = self.other_it.next();
1448            }
1449
1450            match (self.my.take(), self.other.take()) {
1451                // there are no more elements in my_it, all the next elements in other
1452                // should be added
1453                // if other was none, we will stop the iterator there
1454                (None, other) => return other.map(|(k, _)| (k, DiffResult::Added)),
1455                // there are no more elements in other_it, all the next elements in my
1456                // should be removed
1457                (Some((k, _)), None) => return Some((k, DiffResult::Removed)),
1458                // element is present in my but not other
1459                (Some((k1, _v1)), Some((k2, v2))) if k1 < k2 => {
1460                    self.other = Some((k2, v2));
1461                    return Some((k1, DiffResult::Removed));
1462                }
1463                // element is present in other byt not in my
1464                (Some((k1, v1)), Some((k2, _v2))) if k1 > k2 => {
1465                    self.my = Some((k1, v1));
1466                    return Some((k2, DiffResult::Added));
1467                }
1468                (Some((k1, v1)), Some((_k2, v2))) if v1 != v2 => {
1469                    // key is present in both, if elements have changed
1470                    // return a value, otherwise go to the next key for both maps
1471                    return Some((k1, DiffResult::Changed));
1472                }
1473                _ => {}
1474            }
1475        }
1476    }
1477}
1478
1479#[cfg(test)]
1480mod tests {
1481    use rand::{seq::SliceRandom, thread_rng, Rng};
1482
1483    use super::*;
1484    use crate::proto::command::{
1485        CustomHttpAnswers, LoadBalancingParams, RequestHttpFrontend, RulePosition,
1486    };
1487
1488    #[test]
1489    fn serialize() {
1490        let mut state: ConfigState = Default::default();
1491        state
1492            .dispatch(
1493                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1494                    cluster_id: Some(String::from("cluster_1")),
1495                    hostname: String::from("lolcatho.st:8080"),
1496                    path: PathRule::prefix(String::from("/")),
1497                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1498                    position: RulePosition::Tree.into(),
1499                    ..Default::default()
1500                })
1501                .into(),
1502            )
1503            .expect("Could not execute request");
1504        state
1505            .dispatch(
1506                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1507                    cluster_id: Some(String::from("cluster_2")),
1508                    hostname: String::from("test.local"),
1509                    path: PathRule::prefix(String::from("/abc")),
1510                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1511                    position: RulePosition::Pre.into(),
1512                    ..Default::default()
1513                })
1514                .into(),
1515            )
1516            .expect("Could not execute request");
1517        state
1518            .dispatch(
1519                &RequestType::AddBackend(AddBackend {
1520                    cluster_id: String::from("cluster_1"),
1521                    backend_id: String::from("cluster_1-0"),
1522                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1523                    ..Default::default()
1524                })
1525                .into(),
1526            )
1527            .expect("Could not execute request");
1528        state
1529            .dispatch(
1530                &RequestType::AddBackend(AddBackend {
1531                    cluster_id: String::from("cluster_1"),
1532                    backend_id: String::from("cluster_1-1"),
1533                    address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
1534                    ..Default::default()
1535                })
1536                .into(),
1537            )
1538            .expect("Could not execute request");
1539        state
1540            .dispatch(
1541                &RequestType::AddBackend(AddBackend {
1542                    cluster_id: String::from("cluster_2"),
1543                    backend_id: String::from("cluster_2-0"),
1544                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
1545                    ..Default::default()
1546                })
1547                .into(),
1548            )
1549            .expect("Could not execute request");
1550        state
1551            .dispatch(
1552                &RequestType::AddBackend(AddBackend {
1553                    cluster_id: String::from("cluster_1"),
1554                    backend_id: String::from("cluster_1-3"),
1555                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
1556                    ..Default::default()
1557                })
1558                .into(),
1559            )
1560            .expect("Could not execute request");
1561        state
1562            .dispatch(
1563                &RequestType::RemoveBackend(RemoveBackend {
1564                    cluster_id: String::from("cluster_1"),
1565                    backend_id: String::from("cluster_1-3"),
1566                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
1567                })
1568                .into(),
1569            )
1570            .expect("Could not execute request");
1571
1572        /*
1573        let encoded = state.encode();
1574        println!("serialized:\n{}", encoded);
1575
1576        let new_state: Option<HttpProxy> = decode_str(&encoded);
1577        println!("deserialized:\n{:?}", new_state);
1578        assert_eq!(new_state, Some(state));
1579        */
1580        //assert!(false);
1581    }
1582
1583    #[test]
1584    fn diff() {
1585        let mut state: ConfigState = Default::default();
1586        state
1587            .dispatch(
1588                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1589                    cluster_id: Some(String::from("cluster_1")),
1590                    hostname: String::from("lolcatho.st:8080"),
1591                    path: PathRule::prefix(String::from("/")),
1592                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1593                    position: RulePosition::Post.into(),
1594                    ..Default::default()
1595                })
1596                .into(),
1597            )
1598            .expect("Could not execute request");
1599        state
1600            .dispatch(
1601                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1602                    cluster_id: Some(String::from("cluster_2")),
1603                    hostname: String::from("test.local"),
1604                    path: PathRule::prefix(String::from("/abc")),
1605                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1606                    ..Default::default()
1607                })
1608                .into(),
1609            )
1610            .expect("Could not execute request");
1611        state
1612            .dispatch(
1613                &RequestType::AddBackend(AddBackend {
1614                    cluster_id: String::from("cluster_1"),
1615                    backend_id: String::from("cluster_1-0"),
1616                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1617                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1618                    ..Default::default()
1619                })
1620                .into(),
1621            )
1622            .expect("Could not execute request");
1623        state
1624            .dispatch(
1625                &RequestType::AddBackend(AddBackend {
1626                    cluster_id: String::from("cluster_1"),
1627                    backend_id: String::from("cluster_1-1"),
1628                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
1629                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1630                    ..Default::default()
1631                })
1632                .into(),
1633            )
1634            .expect("Could not execute request");
1635        state
1636            .dispatch(
1637                &RequestType::AddBackend(AddBackend {
1638                    cluster_id: String::from("cluster_2"),
1639                    backend_id: String::from("cluster_2-0"),
1640                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
1641                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1642                    ..Default::default()
1643                })
1644                .into(),
1645            )
1646            .expect("Could not execute request");
1647        state
1648            .dispatch(
1649                &RequestType::AddCluster(Cluster {
1650                    cluster_id: String::from("cluster_2"),
1651                    sticky_session: true,
1652                    https_redirect: true,
1653                    ..Default::default()
1654                })
1655                .into(),
1656            )
1657            .expect("Could not execute request");
1658
1659        let mut state2: ConfigState = Default::default();
1660        state2
1661            .dispatch(
1662                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1663                    cluster_id: Some(String::from("cluster_1")),
1664                    hostname: String::from("lolcatho.st:8080"),
1665                    path: PathRule::prefix(String::from("/")),
1666                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1667                    position: RulePosition::Post.into(),
1668                    ..Default::default()
1669                })
1670                .into(),
1671            )
1672            .expect("Could not execute request");
1673        state2
1674            .dispatch(
1675                &RequestType::AddBackend(AddBackend {
1676                    cluster_id: String::from("cluster_1"),
1677                    backend_id: String::from("cluster_1-0"),
1678                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1679                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1680                    ..Default::default()
1681                })
1682                .into(),
1683            )
1684            .expect("Could not execute request");
1685        state2
1686            .dispatch(
1687                &RequestType::AddBackend(AddBackend {
1688                    cluster_id: String::from("cluster_1"),
1689                    backend_id: String::from("cluster_1-1"),
1690                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
1691                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1692                    ..Default::default()
1693                })
1694                .into(),
1695            )
1696            .expect("Could not execute request");
1697        state2
1698            .dispatch(
1699                &RequestType::AddBackend(AddBackend {
1700                    cluster_id: String::from("cluster_1"),
1701                    backend_id: String::from("cluster_1-2"),
1702                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
1703                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1704                    ..Default::default()
1705                })
1706                .into(),
1707            )
1708            .expect("Could not execute request");
1709        state2
1710            .dispatch(
1711                &RequestType::AddCluster(Cluster {
1712                    cluster_id: String::from("cluster_3"),
1713                    sticky_session: false,
1714                    https_redirect: false,
1715                    ..Default::default()
1716                })
1717                .into(),
1718            )
1719            .expect("Could not execute request");
1720
1721        let e: Vec<Request> = vec![
1722            RequestType::RemoveHttpFrontend(RequestHttpFrontend {
1723                cluster_id: Some(String::from("cluster_2")),
1724                hostname: String::from("test.local"),
1725                path: PathRule::prefix(String::from("/abc")),
1726                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1727                ..Default::default()
1728            })
1729            .into(),
1730            RequestType::RemoveBackend(RemoveBackend {
1731                cluster_id: String::from("cluster_2"),
1732                backend_id: String::from("cluster_2-0"),
1733                address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
1734            })
1735            .into(),
1736            RequestType::AddBackend(AddBackend {
1737                cluster_id: String::from("cluster_1"),
1738                backend_id: String::from("cluster_1-2"),
1739                address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
1740                load_balancing_parameters: Some(LoadBalancingParams::default()),
1741                ..Default::default()
1742            })
1743            .into(),
1744            RequestType::RemoveCluster(String::from("cluster_2")).into(),
1745            RequestType::AddCluster(Cluster {
1746                cluster_id: String::from("cluster_3"),
1747                sticky_session: false,
1748                https_redirect: false,
1749                ..Default::default()
1750            })
1751            .into(),
1752        ];
1753        let expected_diff: HashSet<&Request> = HashSet::from_iter(e.iter());
1754
1755        let d = state.diff(&state2);
1756        let diff = HashSet::from_iter(d.iter());
1757        println!("diff requests:\n{diff:#?}\n");
1758        println!("expected diff requests:\n{expected_diff:#?}\n");
1759
1760        let hash1 = state.hash_state();
1761        let hash2 = state2.hash_state();
1762        let mut state3 = state.clone();
1763        state3
1764            .dispatch(
1765                &RequestType::AddBackend(AddBackend {
1766                    cluster_id: String::from("cluster_1"),
1767                    backend_id: String::from("cluster_1-2"),
1768                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
1769                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1770                    ..Default::default()
1771                })
1772                .into(),
1773            )
1774            .expect("Could not execute request");
1775        let hash3 = state3.hash_state();
1776        println!("state 1 hashes: {hash1:#?}");
1777        println!("state 2 hashes: {hash2:#?}");
1778        println!("state 3 hashes: {hash3:#?}");
1779
1780        assert_eq!(diff, expected_diff);
1781    }
1782
1783    #[test]
1784    fn cluster_ids_by_domain() {
1785        let mut config = ConfigState::new();
1786        let http_front_cluster1 = RequestHttpFrontend {
1787            cluster_id: Some(String::from("MyCluster_1")),
1788            hostname: String::from("lolcatho.st"),
1789            path: PathRule::prefix(String::from("")),
1790            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1791            ..Default::default()
1792        };
1793
1794        let https_front_cluster1 = RequestHttpFrontend {
1795            cluster_id: Some(String::from("MyCluster_1")),
1796            hostname: String::from("lolcatho.st"),
1797            path: PathRule::prefix(String::from("")),
1798            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
1799            ..Default::default()
1800        };
1801
1802        let http_front_cluster2 = RequestHttpFrontend {
1803            cluster_id: Some(String::from("MyCluster_2")),
1804            hostname: String::from("lolcatho.st"),
1805            path: PathRule::prefix(String::from("/api")),
1806            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1807            ..Default::default()
1808        };
1809
1810        let https_front_cluster2 = RequestHttpFrontend {
1811            cluster_id: Some(String::from("MyCluster_2")),
1812            hostname: String::from("lolcatho.st"),
1813            path: PathRule::prefix(String::from("/api")),
1814            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
1815            ..Default::default()
1816        };
1817
1818        config
1819            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster1).into())
1820            .expect("Could not execute request");
1821        config
1822            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster2).into())
1823            .expect("Could not execute request");
1824        config
1825            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster1).into())
1826            .expect("Could not execute request");
1827        config
1828            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster2).into())
1829            .expect("Could not execute request");
1830
1831        let mut cluster1_cluster2: HashSet<ClusterId> = HashSet::new();
1832        cluster1_cluster2.insert(String::from("MyCluster_1"));
1833        cluster1_cluster2.insert(String::from("MyCluster_2"));
1834
1835        let mut cluster2: HashSet<ClusterId> = HashSet::new();
1836        cluster2.insert(String::from("MyCluster_2"));
1837
1838        let empty: HashSet<ClusterId> = HashSet::new();
1839        assert_eq!(
1840            config.get_cluster_ids_by_domain(String::from("lolcatho.st"), None),
1841            cluster1_cluster2
1842        );
1843        assert_eq!(
1844            config
1845                .get_cluster_ids_by_domain(String::from("lolcatho.st"), Some(String::from("/api"))),
1846            cluster2
1847        );
1848        assert_eq!(
1849            config.get_cluster_ids_by_domain(String::from("lolcathost"), None),
1850            empty
1851        );
1852        assert_eq!(
1853            config
1854                .get_cluster_ids_by_domain(String::from("lolcathost"), Some(String::from("/sozu"))),
1855            empty
1856        );
1857    }
1858
1859    #[test]
1860    fn duplicate_backends() {
1861        let mut state: ConfigState = Default::default();
1862        state
1863            .dispatch(
1864                &RequestType::AddBackend(AddBackend {
1865                    cluster_id: String::from("cluster_1"),
1866                    backend_id: String::from("cluster_1-0"),
1867                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1868                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1869                    ..Default::default()
1870                })
1871                .into(),
1872            )
1873            .expect("Could not execute request");
1874
1875        let b = Backend {
1876            cluster_id: String::from("cluster_1"),
1877            backend_id: String::from("cluster_1-0"),
1878            address: "127.0.0.1:1026".parse().unwrap(),
1879            load_balancing_parameters: Some(LoadBalancingParams::default()),
1880            sticky_id: Some("sticky".to_string()),
1881            backup: None,
1882        };
1883
1884        state
1885            .dispatch(&RequestType::AddBackend(b.clone().to_add_backend()).into())
1886            .expect("Could not execute order");
1887
1888        assert_eq!(state.backends.get("cluster_1").unwrap(), &vec![b]);
1889    }
1890
1891    #[test]
1892    fn remove_backend() {
1893        let mut state: ConfigState = Default::default();
1894        state
1895            .dispatch(
1896                &RequestType::AddCluster(Cluster {
1897                    cluster_id: String::from("cluster_1"),
1898                    ..Default::default()
1899                })
1900                .into(),
1901            )
1902            .expect("Could not execute request");
1903
1904        for i in 0..10 {
1905            state
1906                .dispatch(
1907                    &RequestType::AddBackend(AddBackend {
1908                        cluster_id: String::from("cluster_1"),
1909                        backend_id: format!("cluster_1-{i}"),
1910                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1911                        ..Default::default()
1912                    })
1913                    .into(),
1914                )
1915                .expect("Could not execute request");
1916        }
1917
1918        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 10);
1919
1920        let remove_backend_2 = RequestType::RemoveBackend(RemoveBackend {
1921            cluster_id: String::from("cluster_1"),
1922            backend_id: String::from("cluster_1-0"),
1923            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1924        })
1925        .into();
1926
1927        let remove_backend_result = state.dispatch(&remove_backend_2);
1928
1929        assert!(remove_backend_result.is_ok());
1930        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
1931
1932        let redundant_remove = state.dispatch(&remove_backend_2);
1933        assert!(matches!(redundant_remove, Err(StateError::NoChange)));
1934        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
1935    }
1936
1937    #[test]
1938    fn remove_backends_randomly() {
1939        let mut state: ConfigState = Default::default();
1940        state
1941            .dispatch(
1942                &RequestType::AddCluster(Cluster {
1943                    cluster_id: String::from("cluster_1"),
1944                    ..Default::default()
1945                })
1946                .into(),
1947            )
1948            .expect("Could not execute request");
1949
1950        for _ in 0..1000 {
1951            for i in 0..10 {
1952                state
1953                    .dispatch(
1954                        &RequestType::AddBackend(AddBackend {
1955                            cluster_id: String::from("cluster_1"),
1956                            backend_id: format!("cluster_1-{i}"),
1957                            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1958                            ..Default::default()
1959                        })
1960                        .into(),
1961                    )
1962                    .expect("Could not execute request");
1963            }
1964
1965            let mut rng = thread_rng();
1966            let mut indexes = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
1967            indexes.shuffle(&mut rng);
1968            let random_count = rng.gen_range(1..indexes.len());
1969            let random_indexes: Vec<i32> = indexes.into_iter().take(random_count).collect();
1970
1971            for j in random_indexes {
1972                let remove_backend_result = state.dispatch(
1973                    &RequestType::RemoveBackend(RemoveBackend {
1974                        cluster_id: String::from("cluster_1"),
1975                        backend_id: format!("cluster_1-{j}"),
1976                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1977                    })
1978                    .into(),
1979                );
1980                assert!(remove_backend_result.is_ok());
1981            }
1982        }
1983    }
1984
1985    #[test]
1986    fn listener_diff() {
1987        let mut state: ConfigState = Default::default();
1988        let custom_http_answers = Some(CustomHttpAnswers {
1989            answer_404: Some("test".to_string()),
1990            ..Default::default()
1991        });
1992        state
1993            .dispatch(
1994                &RequestType::AddTcpListener(TcpListenerConfig {
1995                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
1996                    ..Default::default()
1997                })
1998                .into(),
1999            )
2000            .expect("Could not execute request");
2001        state
2002            .dispatch(
2003                &RequestType::ActivateListener(ActivateListener {
2004                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2005                    proxy: ListenerType::Tcp.into(),
2006                    from_scm: false,
2007                })
2008                .into(),
2009            )
2010            .expect("Could not execute request");
2011        state
2012            .dispatch(
2013                &RequestType::AddHttpListener(HttpListenerConfig {
2014                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2015                    ..Default::default()
2016                })
2017                .into(),
2018            )
2019            .expect("Could not execute request");
2020        state
2021            .dispatch(
2022                &RequestType::AddHttpsListener(HttpsListenerConfig {
2023                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2024                    ..Default::default()
2025                })
2026                .into(),
2027            )
2028            .expect("Could not execute request");
2029        state
2030            .dispatch(
2031                &RequestType::ActivateListener(ActivateListener {
2032                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2033                    proxy: ListenerType::Https.into(),
2034                    from_scm: false,
2035                })
2036                .into(),
2037            )
2038            .expect("Could not execute request");
2039
2040        let mut state2: ConfigState = Default::default();
2041        state2
2042            .dispatch(
2043                &RequestType::AddTcpListener(TcpListenerConfig {
2044                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2045                    expect_proxy: true,
2046                    ..Default::default()
2047                })
2048                .into(),
2049            )
2050            .expect("Could not execute request");
2051        state2
2052            .dispatch(
2053                &RequestType::AddHttpListener(HttpListenerConfig {
2054                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2055                    http_answers: custom_http_answers.clone(),
2056                    ..Default::default()
2057                })
2058                .into(),
2059            )
2060            .expect("Could not execute request");
2061        state2
2062            .dispatch(
2063                &RequestType::ActivateListener(ActivateListener {
2064                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2065                    proxy: ListenerType::Http.into(),
2066                    from_scm: false,
2067                })
2068                .into(),
2069            )
2070            .expect("Could not execute request");
2071        state2
2072            .dispatch(
2073                &RequestType::AddHttpsListener(HttpsListenerConfig {
2074                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2075                    http_answers: custom_http_answers.clone(),
2076                    ..Default::default()
2077                })
2078                .into(),
2079            )
2080            .expect("Could not execute request");
2081        state2
2082            .dispatch(
2083                &RequestType::ActivateListener(ActivateListener {
2084                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2085                    proxy: ListenerType::Https.into(),
2086                    from_scm: false,
2087                })
2088                .into(),
2089            )
2090            .expect("Could not execute request");
2091
2092        let e: Vec<Request> = vec![
2093            RequestType::RemoveListener(RemoveListener {
2094                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2095                proxy: ListenerType::Tcp.into(),
2096            })
2097            .into(),
2098            RequestType::AddTcpListener(TcpListenerConfig {
2099                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2100                expect_proxy: true,
2101                ..Default::default()
2102            })
2103            .into(),
2104            RequestType::DeactivateListener(DeactivateListener {
2105                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2106                proxy: ListenerType::Tcp.into(),
2107                to_scm: false,
2108            })
2109            .into(),
2110            RequestType::RemoveListener(RemoveListener {
2111                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2112                proxy: ListenerType::Http.into(),
2113            })
2114            .into(),
2115            RequestType::AddHttpListener(HttpListenerConfig {
2116                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2117                http_answers: custom_http_answers.clone(),
2118                ..Default::default()
2119            })
2120            .into(),
2121            RequestType::ActivateListener(ActivateListener {
2122                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2123                proxy: ListenerType::Http.into(),
2124                from_scm: false,
2125            })
2126            .into(),
2127            RequestType::RemoveListener(RemoveListener {
2128                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2129                proxy: ListenerType::Https.into(),
2130            })
2131            .into(),
2132            RequestType::AddHttpsListener(HttpsListenerConfig {
2133                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2134                http_answers: custom_http_answers.clone(),
2135                ..Default::default()
2136            })
2137            .into(),
2138        ];
2139
2140        let diff = state.diff(&state2);
2141        //let diff: HashSet<&RequestContent> = HashSet::from_iter(d.iter());
2142        println!("expected diff requests:\n{e:#?}\n");
2143        println!("diff requests:\n{diff:#?}\n");
2144
2145        let _hash1 = state.hash_state();
2146        let _hash2 = state2.hash_state();
2147
2148        assert_eq!(diff, e);
2149    }
2150
2151    #[test]
2152    fn certificate_retrieval() {
2153        let mut state: ConfigState = Default::default();
2154        let certificate_and_key = CertificateAndKey {
2155            certificate: String::from(include_str!("../assets/certificate.pem")),
2156            key: String::from(include_str!("../assets/key.pem")),
2157            certificate_chain: vec![],
2158            versions: vec![],
2159            names: vec!["lolcatho.st".to_string()],
2160        };
2161        let add_certificate = AddCertificate {
2162            address: SocketAddress::new_v4(127, 0, 0, 1, 8080),
2163            certificate: certificate_and_key,
2164            expired_at: None,
2165        };
2166        state
2167            .dispatch(&RequestType::AddCertificate(add_certificate).into())
2168            .expect("Could not add certificate");
2169
2170        println!("state: {:#?}", state);
2171
2172        // let fingerprint: Fingerprint = serde_json::from_str(
2173        //     "\"ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5\"",
2174        // )
2175        // .expect("Could not deserialize the fingerprint");
2176
2177        let certificates_found_by_fingerprint = state.get_certificates(QueryCertificatesFilters {
2178            domain: None,
2179            fingerprint: Some(
2180                "ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5".to_string(),
2181            ),
2182        });
2183
2184        println!(
2185            "found certificate: {:#?}",
2186            certificates_found_by_fingerprint
2187        );
2188
2189        assert!(!certificates_found_by_fingerprint.is_empty());
2190
2191        let certificate_found_by_domain_name = state.get_certificates(QueryCertificatesFilters {
2192            domain: Some("lolcatho.st".to_string()),
2193            fingerprint: None,
2194        });
2195
2196        assert!(!certificate_found_by_domain_name.is_empty());
2197    }
2198}