sozu_command_lib/
state.rs

1use std::{
2    collections::{
3        BTreeMap, BTreeSet, HashMap, HashSet, btree_map::Entry as BTreeMapEntry,
4        hash_map::DefaultHasher,
5    },
6    fs::File,
7    hash::{Hash, Hasher},
8    io::Write,
9    iter::repeat,
10    net::SocketAddr,
11};
12
13use prost::{Message, UnknownEnumValue};
14
15use crate::{
16    ObjectKind,
17    certificate::{CertificateError, Fingerprint, calculate_fingerprint},
18    proto::{
19        command::{
20            ActivateListener, AddBackend, AddCertificate, CertificateAndKey, Cluster,
21            ClusterInformation, DeactivateListener, FrontendFilters, HttpListenerConfig,
22            HttpsListenerConfig, InitialState, ListedFrontends, ListenerType, ListenersList,
23            PathRule, QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener,
24            ReplaceCertificate, Request, RequestCounts, RequestHttpFrontend, RequestTcpFrontend,
25            SocketAddress, TcpListenerConfig, WorkerRequest, request::RequestType,
26        },
27        display::format_request_type,
28    },
29    response::{Backend, HttpFrontend, TcpFrontend},
30};
31
32/// To use throughout Sōzu
33pub type ClusterId = String;
34
35#[derive(thiserror::Error, Debug)]
36pub enum StateError {
37    #[error("Request came in empty")]
38    EmptyRequest,
39    #[error("dispatching this request did not bring any change to the state")]
40    NoChange,
41    #[error("State can not handle this request")]
42    UndispatchableRequest,
43    #[error("Did not find {kind:?} with address or id '{id}'")]
44    NotFound { kind: ObjectKind, id: String },
45    #[error("{kind:?} '{id}' already exists")]
46    Exists { kind: ObjectKind, id: String },
47    #[error("Wrong field value: {0}")]
48    WrongFieldValue(UnknownEnumValue),
49    #[error("Could not add certificate: {0}")]
50    AddCertificate(CertificateError),
51    #[error("Could not remove certificate: {0}")]
52    RemoveCertificate(String),
53    #[error("Could not replace certificate: {0}")]
54    ReplaceCertificate(String),
55    #[error(
56        "Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
57    )]
58    FrontendConversion { frontend: String, error: String },
59    #[error("Could not write state to file: {0}")]
60    FileError(std::io::Error),
61}
62
63/// The `ConfigState` represents the state of Sōzu's business, which is to forward traffic
64/// from frontends to backends. Hence, it contains all details about:
65///
66/// - listeners (socket addresses, for TCP and HTTP connections)
67/// - frontends (bind to a listener)
68/// - backends (to forward connections to)
69/// - clusters (routing rules from frontends to backends)
70/// - TLS certificates
71#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct ConfigState {
73    pub clusters: BTreeMap<ClusterId, Cluster>,
74    pub backends: BTreeMap<ClusterId, Vec<Backend>>,
75    /// socket address -> HTTP listener
76    pub http_listeners: BTreeMap<SocketAddr, HttpListenerConfig>,
77    /// socket address -> HTTPS listener
78    pub https_listeners: BTreeMap<SocketAddr, HttpsListenerConfig>,
79    /// socket address -> TCP listener
80    pub tcp_listeners: BTreeMap<SocketAddr, TcpListenerConfig>,
81    /// HTTP frontends, indexed by a summary of each front's address;hostname;path, for uniqueness.
82    /// For example: `"0.0.0.0:8080;lolcatho.st;P/api"`
83    pub http_fronts: BTreeMap<String, HttpFrontend>,
84    /// indexed by (address, hostname, path)
85    pub https_fronts: BTreeMap<String, HttpFrontend>,
86    pub tcp_fronts: HashMap<ClusterId, Vec<TcpFrontend>>,
87    pub certificates: HashMap<SocketAddr, HashMap<Fingerprint, CertificateAndKey>>,
88    /// A census of requests that were received. Name of the request -> number of occurences
89    pub request_counts: BTreeMap<String, i32>,
90}
91
92impl ConfigState {
93    pub fn new() -> Self {
94        Self::default()
95    }
96
97    pub fn dispatch(&mut self, request: &Request) -> Result<(), StateError> {
98        let request_type = match &request.request_type {
99            Some(t) => t,
100            None => return Err(StateError::EmptyRequest),
101        };
102
103        self.increment_request_count(request);
104
105        match request_type {
106            RequestType::AddCluster(cluster) => self.add_cluster(cluster),
107            RequestType::RemoveCluster(cluster_id) => self.remove_cluster(cluster_id),
108            RequestType::AddHttpListener(listener) => self.add_http_listener(listener),
109            RequestType::AddHttpsListener(listener) => self.add_https_listener(listener),
110            RequestType::AddTcpListener(listener) => self.add_tcp_listener(listener),
111            RequestType::RemoveListener(remove) => self.remove_listener(remove),
112            RequestType::ActivateListener(activate) => self.activate_listener(activate),
113            RequestType::DeactivateListener(deactivate) => self.deactivate_listener(deactivate),
114            RequestType::AddHttpFrontend(front) => self.add_http_frontend(front),
115            RequestType::RemoveHttpFrontend(front) => self.remove_http_frontend(front),
116            RequestType::AddCertificate(add) => self.add_certificate(add),
117            RequestType::RemoveCertificate(remove) => self.remove_certificate(remove),
118            RequestType::ReplaceCertificate(replace) => self.replace_certificate(replace),
119            RequestType::AddHttpsFrontend(front) => self.add_https_frontend(front),
120            RequestType::RemoveHttpsFrontend(front) => self.remove_https_frontend(front),
121            RequestType::AddTcpFrontend(front) => self.add_tcp_frontend(front),
122            RequestType::RemoveTcpFrontend(front) => self.remove_tcp_frontend(front),
123            RequestType::AddBackend(add_backend) => self.add_backend(add_backend),
124            RequestType::RemoveBackend(backend) => self.remove_backend(backend),
125
126            // This is to avoid the error message
127            RequestType::Logging(_)
128            | RequestType::CountRequests(_)
129            | RequestType::Status(_)
130            | RequestType::SoftStop(_)
131            | RequestType::QueryCertificatesFromWorkers(_)
132            | RequestType::QueryClusterById(_)
133            | RequestType::QueryClustersByDomain(_)
134            | RequestType::QueryMetrics(_)
135            | RequestType::QueryClustersHashes(_)
136            | RequestType::ConfigureMetrics(_)
137            | RequestType::ReturnListenSockets(_)
138            | RequestType::HardStop(_) => Ok(()),
139
140            _other_request => Err(StateError::UndispatchableRequest),
141        }
142    }
143
144    /// Increments the count for this request type
145    fn increment_request_count(&mut self, request: &Request) {
146        if let Some(request_type) = &request.request_type {
147            let count = self
148                .request_counts
149                .entry(format_request_type(request_type).to_owned())
150                .or_insert(1);
151            *count += 1;
152        }
153    }
154
155    pub fn get_request_counts(&self) -> RequestCounts {
156        RequestCounts {
157            map: self.request_counts.clone(),
158        }
159    }
160
161    fn add_cluster(&mut self, cluster: &Cluster) -> Result<(), StateError> {
162        let cluster = cluster.clone();
163        self.clusters.insert(cluster.cluster_id.clone(), cluster);
164        Ok(())
165    }
166
167    fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), StateError> {
168        match self.clusters.remove(cluster_id) {
169            Some(_) => Ok(()),
170            None => Err(StateError::NotFound {
171                kind: ObjectKind::Cluster,
172                id: cluster_id.to_owned(),
173            }),
174        }
175    }
176
177    fn add_http_listener(&mut self, listener: &HttpListenerConfig) -> Result<(), StateError> {
178        let address: SocketAddr = listener.address.into();
179        match self.http_listeners.entry(address) {
180            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
181            BTreeMapEntry::Occupied(_) => {
182                return Err(StateError::Exists {
183                    kind: ObjectKind::HttpListener,
184                    id: address.to_string(),
185                });
186            }
187        };
188        Ok(())
189    }
190
191    fn add_https_listener(&mut self, listener: &HttpsListenerConfig) -> Result<(), StateError> {
192        let address: SocketAddr = listener.address.into();
193        match self.https_listeners.entry(address) {
194            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(listener.clone()),
195            BTreeMapEntry::Occupied(_) => {
196                return Err(StateError::Exists {
197                    kind: ObjectKind::HttpsListener,
198                    id: address.to_string(),
199                });
200            }
201        };
202        Ok(())
203    }
204
205    fn add_tcp_listener(&mut self, listener: &TcpListenerConfig) -> Result<(), StateError> {
206        let address: SocketAddr = listener.address.into();
207        match self.tcp_listeners.entry(address) {
208            BTreeMapEntry::Vacant(vacant_entry) => vacant_entry.insert(*listener),
209            BTreeMapEntry::Occupied(_) => {
210                return Err(StateError::Exists {
211                    kind: ObjectKind::TcpListener,
212                    id: address.to_string(),
213                });
214            }
215        };
216        Ok(())
217    }
218
219    fn remove_listener(&mut self, remove: &RemoveListener) -> Result<(), StateError> {
220        match ListenerType::try_from(remove.proxy).map_err(StateError::WrongFieldValue)? {
221            ListenerType::Http => self.remove_http_listener(&remove.address.into()),
222            ListenerType::Https => self.remove_https_listener(&remove.address.into()),
223            ListenerType::Tcp => self.remove_tcp_listener(&remove.address.into()),
224        }
225    }
226
227    fn remove_http_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
228        if self.http_listeners.remove(address).is_none() {
229            return Err(StateError::NoChange);
230        }
231        Ok(())
232    }
233
234    fn remove_https_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
235        if self.https_listeners.remove(address).is_none() {
236            return Err(StateError::NoChange);
237        }
238        Ok(())
239    }
240
241    fn remove_tcp_listener(&mut self, address: &SocketAddr) -> Result<(), StateError> {
242        if self.tcp_listeners.remove(address).is_none() {
243            return Err(StateError::NoChange);
244        }
245        Ok(())
246    }
247
248    fn activate_listener(&mut self, activate: &ActivateListener) -> Result<(), StateError> {
249        match ListenerType::try_from(activate.proxy).map_err(StateError::WrongFieldValue)? {
250            ListenerType::Http => self
251                .http_listeners
252                .get_mut(&activate.address.into())
253                .map(|listener| listener.active = true)
254                .ok_or(StateError::NotFound {
255                    kind: ObjectKind::HttpListener,
256                    id: activate.address.to_string(),
257                }),
258            ListenerType::Https => self
259                .https_listeners
260                .get_mut(&activate.address.into())
261                .map(|listener| listener.active = true)
262                .ok_or(StateError::NotFound {
263                    kind: ObjectKind::HttpsListener,
264                    id: activate.address.to_string(),
265                }),
266            ListenerType::Tcp => self
267                .tcp_listeners
268                .get_mut(&activate.address.into())
269                .map(|listener| listener.active = true)
270                .ok_or(StateError::NotFound {
271                    kind: ObjectKind::TcpListener,
272                    id: activate.address.to_string(),
273                }),
274        }
275    }
276
277    fn deactivate_listener(&mut self, deactivate: &DeactivateListener) -> Result<(), StateError> {
278        match ListenerType::try_from(deactivate.proxy).map_err(StateError::WrongFieldValue)? {
279            ListenerType::Http => self
280                .http_listeners
281                .get_mut(&deactivate.address.into())
282                .map(|listener| listener.active = false)
283                .ok_or(StateError::NotFound {
284                    kind: ObjectKind::HttpListener,
285                    id: deactivate.address.to_string(),
286                }),
287            ListenerType::Https => self
288                .https_listeners
289                .get_mut(&deactivate.address.into())
290                .map(|listener| listener.active = false)
291                .ok_or(StateError::NotFound {
292                    kind: ObjectKind::HttpsListener,
293                    id: deactivate.address.to_string(),
294                }),
295            ListenerType::Tcp => self
296                .tcp_listeners
297                .get_mut(&deactivate.address.into())
298                .map(|listener| listener.active = false)
299                .ok_or(StateError::NotFound {
300                    kind: ObjectKind::TcpListener,
301                    id: deactivate.address.to_string(),
302                }),
303        }
304    }
305
306    fn add_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
307        let front_as_key = front.to_string();
308
309        match self.http_fronts.entry(front.to_string()) {
310            BTreeMapEntry::Vacant(e) => {
311                e.insert(front.clone().to_frontend().map_err(|into_error| {
312                    StateError::FrontendConversion {
313                        frontend: front_as_key,
314                        error: into_error.to_string(),
315                    }
316                })?)
317            }
318            BTreeMapEntry::Occupied(_) => {
319                return Err(StateError::Exists {
320                    kind: ObjectKind::HttpFrontend,
321                    id: front.to_string(),
322                });
323            }
324        };
325        Ok(())
326    }
327
328    fn add_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
329        let front_as_key = front.to_string();
330
331        match self.https_fronts.entry(front.to_string()) {
332            BTreeMapEntry::Vacant(e) => {
333                e.insert(front.clone().to_frontend().map_err(|into_error| {
334                    StateError::FrontendConversion {
335                        frontend: front_as_key,
336                        error: into_error.to_string(),
337                    }
338                })?)
339            }
340            BTreeMapEntry::Occupied(_) => {
341                return Err(StateError::Exists {
342                    kind: ObjectKind::HttpsFrontend,
343                    id: front.to_string(),
344                });
345            }
346        };
347        Ok(())
348    }
349
350    fn remove_http_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
351        self.http_fronts
352            .remove(&front.to_string())
353            .ok_or(StateError::NotFound {
354                kind: ObjectKind::HttpFrontend,
355                id: front.to_string(),
356            })?;
357        Ok(())
358    }
359
360    fn remove_https_frontend(&mut self, front: &RequestHttpFrontend) -> Result<(), StateError> {
361        self.https_fronts
362            .remove(&front.to_string())
363            .ok_or(StateError::NotFound {
364                kind: ObjectKind::HttpsFrontend,
365                id: front.to_string(),
366            })?;
367        Ok(())
368    }
369
370    fn add_certificate(&mut self, add: &AddCertificate) -> Result<(), StateError> {
371        let fingerprint = add
372            .certificate
373            .fingerprint()
374            .map_err(StateError::AddCertificate)?;
375
376        let entry = self.certificates.entry(add.address.into()).or_default();
377
378        let mut add = add.clone();
379        add.certificate
380            .apply_overriding_names()
381            .map_err(StateError::AddCertificate)?;
382
383        if entry.contains_key(&fingerprint) {
384            info!(
385                "Skip loading of certificate '{}' for domain '{}' on listener '{}', the certificate is already present.",
386                fingerprint,
387                add.certificate.names.join(", "),
388                add.address
389            );
390            return Ok(());
391        }
392
393        entry.insert(fingerprint, add.certificate);
394        Ok(())
395    }
396
397    fn remove_certificate(&mut self, remove: &RemoveCertificate) -> Result<(), StateError> {
398        let fingerprint = Fingerprint(
399            hex::decode(&remove.fingerprint)
400                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
401        );
402
403        if let Some(index) = self.certificates.get_mut(&remove.address.into()) {
404            index.remove(&fingerprint);
405        }
406
407        Ok(())
408    }
409
410    /// - Remove old certificate from certificates, using the old fingerprint
411    /// - calculate the new fingerprint
412    /// - insert the new certificate with the new fingerprint as key
413    /// - check that the new entry is present in the certificates hashmap
414    fn replace_certificate(&mut self, replace: &ReplaceCertificate) -> Result<(), StateError> {
415        let replace_address = replace.address.into();
416        let old_fingerprint = Fingerprint(
417            hex::decode(&replace.old_fingerprint)
418                .map_err(|decode_error| StateError::RemoveCertificate(decode_error.to_string()))?,
419        );
420
421        self.certificates
422            .get_mut(&replace_address)
423            .ok_or(StateError::NotFound {
424                kind: ObjectKind::Certificate,
425                id: replace.address.to_string(),
426            })?
427            .remove(&old_fingerprint);
428
429        let new_fingerprint = Fingerprint(
430            calculate_fingerprint(replace.new_certificate.certificate.as_bytes()).map_err(
431                |fingerprint_err| StateError::ReplaceCertificate(fingerprint_err.to_string()),
432            )?,
433        );
434
435        self.certificates
436            .get_mut(&replace_address)
437            .map(|certs| certs.insert(new_fingerprint.clone(), replace.new_certificate.clone()));
438
439        if !self
440            .certificates
441            .get(&replace_address)
442            .ok_or(StateError::ReplaceCertificate(
443                "Unlikely error. This entry in the certificate hashmap should be present"
444                    .to_string(),
445            ))?
446            .contains_key(&new_fingerprint)
447        {
448            return Err(StateError::ReplaceCertificate(format!(
449                "Failed to insert the new certificate for address {}",
450                replace.address
451            )));
452        }
453        Ok(())
454    }
455
456    fn add_tcp_frontend(&mut self, front: &RequestTcpFrontend) -> Result<(), StateError> {
457        let tcp_frontends = self.tcp_fronts.entry(front.cluster_id.clone()).or_default();
458
459        let tcp_frontend = TcpFrontend {
460            cluster_id: front.cluster_id.clone(),
461            address: front.address.into(),
462            tags: front.tags.clone(),
463        };
464        if tcp_frontends.contains(&tcp_frontend) {
465            return Err(StateError::Exists {
466                kind: ObjectKind::TcpFrontend,
467                id: format!("{:?}", tcp_frontend),
468            });
469        }
470
471        tcp_frontends.push(tcp_frontend);
472        Ok(())
473    }
474
475    fn remove_tcp_frontend(
476        &mut self,
477        front_to_remove: &RequestTcpFrontend,
478    ) -> Result<(), StateError> {
479        let tcp_frontends =
480            self.tcp_fronts
481                .get_mut(&front_to_remove.cluster_id)
482                .ok_or(StateError::NotFound {
483                    kind: ObjectKind::TcpFrontend,
484                    id: format!("{:?}", front_to_remove),
485                })?;
486
487        let len = tcp_frontends.len();
488        tcp_frontends.retain(|front| front.address != front_to_remove.address.into());
489        if tcp_frontends.len() == len {
490            return Err(StateError::NoChange);
491        }
492        Ok(())
493    }
494
495    fn add_backend(&mut self, add_backend: &AddBackend) -> Result<(), StateError> {
496        let backend = Backend {
497            address: add_backend.address.into(),
498            cluster_id: add_backend.cluster_id.clone(),
499            backend_id: add_backend.backend_id.clone(),
500            sticky_id: add_backend.sticky_id.clone(),
501            load_balancing_parameters: add_backend.load_balancing_parameters,
502            backup: add_backend.backup,
503        };
504        let backends = self.backends.entry(backend.cluster_id.clone()).or_default();
505
506        // we might be modifying the sticky id or load balancing parameters
507        backends.retain(|b| b.backend_id != backend.backend_id || b.address != backend.address);
508        backends.push(backend);
509        backends.sort();
510
511        Ok(())
512    }
513
514    fn remove_backend(&mut self, backend: &RemoveBackend) -> Result<(), StateError> {
515        let backend_list =
516            self.backends
517                .get_mut(&backend.cluster_id)
518                .ok_or(StateError::NotFound {
519                    kind: ObjectKind::Backend,
520                    id: backend.backend_id.to_owned(),
521                })?;
522
523        let len = backend_list.len();
524        let remove_address = backend.address.into();
525        backend_list.retain(|b| b.backend_id != backend.backend_id || b.address != remove_address);
526        backend_list.sort();
527        if backend_list.len() == len {
528            return Err(StateError::NoChange);
529        }
530        Ok(())
531    }
532
533    /// creates all requests needed to bootstrap the state
534    fn generate_requests(&self) -> Vec<Request> {
535        let mut v: Vec<Request> = Vec::new();
536
537        for listener in self.http_listeners.values() {
538            v.push(RequestType::AddHttpListener(listener.clone()).into());
539            if listener.active {
540                v.push(
541                    RequestType::ActivateListener(ActivateListener {
542                        address: listener.address,
543                        proxy: ListenerType::Http.into(),
544                        from_scm: false,
545                    })
546                    .into(),
547                );
548            }
549        }
550
551        for listener in self.https_listeners.values() {
552            v.push(RequestType::AddHttpsListener(listener.clone()).into());
553            if listener.active {
554                v.push(
555                    RequestType::ActivateListener(ActivateListener {
556                        address: listener.address,
557                        proxy: ListenerType::Https.into(),
558                        from_scm: false,
559                    })
560                    .into(),
561                );
562            }
563        }
564
565        for listener in self.tcp_listeners.values() {
566            v.push(RequestType::AddTcpListener(*listener).into());
567            if listener.active {
568                v.push(
569                    RequestType::ActivateListener(ActivateListener {
570                        address: listener.address,
571                        proxy: ListenerType::Tcp.into(),
572                        from_scm: false,
573                    })
574                    .into(),
575                );
576            }
577        }
578
579        for cluster in self.clusters.values() {
580            v.push(RequestType::AddCluster(cluster.clone()).into());
581        }
582
583        for front in self.http_fronts.values() {
584            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
585        }
586
587        for (front, certs) in self.certificates.iter() {
588            for certificate_and_key in certs.values() {
589                v.push(
590                    RequestType::AddCertificate(AddCertificate {
591                        address: SocketAddress::from(*front),
592                        certificate: certificate_and_key.clone(),
593                        expired_at: None,
594                    })
595                    .into(),
596                );
597            }
598        }
599
600        for front in self.https_fronts.values() {
601            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
602        }
603
604        for front_list in self.tcp_fronts.values() {
605            for front in front_list {
606                v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
607            }
608        }
609
610        for backend_list in self.backends.values() {
611            for backend in backend_list {
612                v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
613            }
614        }
615
616        v
617    }
618
619    pub fn generate_activate_requests(&self) -> Vec<Request> {
620        let mut v: Vec<Request> = Vec::new();
621        for front in self
622            .http_listeners
623            .iter()
624            .filter(|(_, listener)| listener.active)
625            .map(|(k, _)| k)
626        {
627            v.push(
628                RequestType::ActivateListener(ActivateListener {
629                    address: SocketAddress::from(*front),
630                    proxy: ListenerType::Http.into(),
631                    from_scm: false,
632                })
633                .into(),
634            );
635        }
636
637        for front in self
638            .https_listeners
639            .iter()
640            .filter(|(_, listener)| listener.active)
641            .map(|(k, _)| k)
642        {
643            v.push(
644                RequestType::ActivateListener(ActivateListener {
645                    address: SocketAddress::from(*front),
646                    proxy: ListenerType::Https.into(),
647                    from_scm: false,
648                })
649                .into(),
650            );
651        }
652        for front in self
653            .tcp_listeners
654            .iter()
655            .filter(|(_, listener)| listener.active)
656            .map(|(k, _)| k)
657        {
658            v.push(
659                RequestType::ActivateListener(ActivateListener {
660                    address: SocketAddress::from(*front),
661                    proxy: ListenerType::Tcp.into(),
662                    from_scm: false,
663                })
664                .into(),
665            );
666        }
667
668        v
669    }
670
671    pub fn diff(&self, other: &ConfigState) -> Vec<Request> {
672        //pub tcp_listeners:   HashMap<SocketAddr, (TcpListener, bool)>,
673        let my_tcp_listeners: HashSet<&SocketAddr> = self.tcp_listeners.keys().collect();
674        let their_tcp_listeners: HashSet<&SocketAddr> = other.tcp_listeners.keys().collect();
675        let removed_tcp_listeners = my_tcp_listeners.difference(&their_tcp_listeners);
676        let added_tcp_listeners = their_tcp_listeners.difference(&my_tcp_listeners);
677
678        let my_http_listeners: HashSet<&SocketAddr> = self.http_listeners.keys().collect();
679        let their_http_listeners: HashSet<&SocketAddr> = other.http_listeners.keys().collect();
680        let removed_http_listeners = my_http_listeners.difference(&their_http_listeners);
681        let added_http_listeners = their_http_listeners.difference(&my_http_listeners);
682
683        let my_https_listeners: HashSet<&SocketAddr> = self.https_listeners.keys().collect();
684        let their_https_listeners: HashSet<&SocketAddr> = other.https_listeners.keys().collect();
685        let removed_https_listeners = my_https_listeners.difference(&their_https_listeners);
686        let added_https_listeners = their_https_listeners.difference(&my_https_listeners);
687
688        let mut v: Vec<Request> = vec![];
689
690        for address in removed_tcp_listeners {
691            if self.tcp_listeners[*address].active {
692                v.push(
693                    RequestType::DeactivateListener(DeactivateListener {
694                        address: SocketAddress::from(**address),
695                        proxy: ListenerType::Tcp.into(),
696                        to_scm: false,
697                    })
698                    .into(),
699                );
700            }
701
702            v.push(
703                RequestType::RemoveListener(RemoveListener {
704                    address: SocketAddress::from(**address),
705                    proxy: ListenerType::Tcp.into(),
706                })
707                .into(),
708            );
709        }
710
711        for address in added_tcp_listeners.clone() {
712            v.push(RequestType::AddTcpListener(other.tcp_listeners[*address]).into());
713
714            if other.tcp_listeners[*address].active {
715                v.push(
716                    RequestType::ActivateListener(ActivateListener {
717                        address: SocketAddress::from(**address),
718                        proxy: ListenerType::Tcp.into(),
719                        from_scm: false,
720                    })
721                    .into(),
722                );
723            }
724        }
725
726        for address in removed_http_listeners {
727            if self.http_listeners[*address].active {
728                v.push(
729                    RequestType::DeactivateListener(DeactivateListener {
730                        address: SocketAddress::from(**address),
731                        proxy: ListenerType::Http.into(),
732                        to_scm: false,
733                    })
734                    .into(),
735                );
736            }
737
738            v.push(
739                RequestType::RemoveListener(RemoveListener {
740                    address: SocketAddress::from(**address),
741                    proxy: ListenerType::Http.into(),
742                })
743                .into(),
744            );
745        }
746
747        for address in added_http_listeners.clone() {
748            v.push(RequestType::AddHttpListener(other.http_listeners[*address].clone()).into());
749
750            if other.http_listeners[*address].active {
751                v.push(
752                    RequestType::ActivateListener(ActivateListener {
753                        address: SocketAddress::from(**address),
754                        proxy: ListenerType::Http.into(),
755                        from_scm: false,
756                    })
757                    .into(),
758                );
759            }
760        }
761
762        for address in removed_https_listeners {
763            if self.https_listeners[*address].active {
764                v.push(
765                    RequestType::DeactivateListener(DeactivateListener {
766                        address: SocketAddress::from(**address),
767                        proxy: ListenerType::Https.into(),
768                        to_scm: false,
769                    })
770                    .into(),
771                );
772            }
773
774            v.push(
775                RequestType::RemoveListener(RemoveListener {
776                    address: SocketAddress::from(**address),
777                    proxy: ListenerType::Https.into(),
778                })
779                .into(),
780            );
781        }
782
783        for address in added_https_listeners.clone() {
784            v.push(RequestType::AddHttpsListener(other.https_listeners[*address].clone()).into());
785
786            if other.https_listeners[*address].active {
787                v.push(
788                    RequestType::ActivateListener(ActivateListener {
789                        address: SocketAddress::from(**address),
790                        proxy: ListenerType::Https.into(),
791                        from_scm: false,
792                    })
793                    .into(),
794                );
795            }
796        }
797
798        for addr in my_tcp_listeners.intersection(&their_tcp_listeners) {
799            let my_listener = &self.tcp_listeners[*addr];
800            let their_listener = &other.tcp_listeners[*addr];
801
802            if my_listener != their_listener {
803                v.push(
804                    RequestType::RemoveListener(RemoveListener {
805                        address: SocketAddress::from(**addr),
806                        proxy: ListenerType::Tcp.into(),
807                    })
808                    .into(),
809                );
810                // any added listener should be unactive
811                let mut listener_to_add = *their_listener;
812                listener_to_add.active = false;
813                v.push(RequestType::AddTcpListener(listener_to_add).into());
814            }
815
816            if my_listener.active && !their_listener.active {
817                v.push(
818                    RequestType::DeactivateListener(DeactivateListener {
819                        address: SocketAddress::from(**addr),
820                        proxy: ListenerType::Tcp.into(),
821                        to_scm: false,
822                    })
823                    .into(),
824                );
825            }
826
827            if !my_listener.active && their_listener.active {
828                v.push(
829                    RequestType::ActivateListener(ActivateListener {
830                        address: SocketAddress::from(**addr),
831                        proxy: ListenerType::Tcp.into(),
832                        from_scm: false,
833                    })
834                    .into(),
835                );
836            }
837        }
838
839        for addr in my_http_listeners.intersection(&their_http_listeners) {
840            let my_listener = &self.http_listeners[*addr];
841            let their_listener = &other.http_listeners[*addr];
842
843            if my_listener != their_listener {
844                v.push(
845                    RequestType::RemoveListener(RemoveListener {
846                        address: SocketAddress::from(**addr),
847                        proxy: ListenerType::Http.into(),
848                    })
849                    .into(),
850                );
851                // any added listener should be unactive
852                let mut listener_to_add = their_listener.clone();
853                listener_to_add.active = false;
854                v.push(RequestType::AddHttpListener(listener_to_add).into());
855            }
856
857            if my_listener.active && !their_listener.active {
858                v.push(
859                    RequestType::DeactivateListener(DeactivateListener {
860                        address: SocketAddress::from(**addr),
861                        proxy: ListenerType::Http.into(),
862                        to_scm: false,
863                    })
864                    .into(),
865                );
866            }
867
868            if !my_listener.active && their_listener.active {
869                v.push(
870                    RequestType::ActivateListener(ActivateListener {
871                        address: SocketAddress::from(**addr),
872                        proxy: ListenerType::Http.into(),
873                        from_scm: false,
874                    })
875                    .into(),
876                );
877            }
878        }
879
880        for addr in my_https_listeners.intersection(&their_https_listeners) {
881            let my_listener = &self.https_listeners[*addr];
882            let their_listener = &other.https_listeners[*addr];
883
884            if my_listener != their_listener {
885                v.push(
886                    RequestType::RemoveListener(RemoveListener {
887                        address: SocketAddress::from(**addr),
888                        proxy: ListenerType::Https.into(),
889                    })
890                    .into(),
891                );
892                // any added listener should be unactive
893                let mut listener_to_add = their_listener.clone();
894                listener_to_add.active = false;
895                v.push(RequestType::AddHttpsListener(listener_to_add).into());
896            }
897
898            if my_listener.active && !their_listener.active {
899                v.push(
900                    RequestType::DeactivateListener(DeactivateListener {
901                        address: SocketAddress::from(**addr),
902                        proxy: ListenerType::Https.into(),
903                        to_scm: false,
904                    })
905                    .into(),
906                );
907            }
908
909            if !my_listener.active && their_listener.active {
910                v.push(
911                    RequestType::ActivateListener(ActivateListener {
912                        address: SocketAddress::from(**addr),
913                        proxy: ListenerType::Https.into(),
914                        from_scm: false,
915                    })
916                    .into(),
917                );
918            }
919        }
920
921        for (cluster_id, res) in diff_map(self.clusters.iter(), other.clusters.iter()) {
922            match res {
923                DiffResult::Added | DiffResult::Changed => v.push(
924                    RequestType::AddCluster(other.clusters.get(cluster_id).unwrap().clone()).into(),
925                ),
926                DiffResult::Removed => {
927                    v.push(RequestType::RemoveCluster(cluster_id.to_string()).into())
928                }
929            }
930        }
931
932        for ((cluster_id, backend_id), res) in diff_map(
933            self.backends.iter().flat_map(|(cluster_id, v)| {
934                v.iter()
935                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
936            }),
937            other.backends.iter().flat_map(|(cluster_id, v)| {
938                v.iter()
939                    .map(move |backend| ((cluster_id, &backend.backend_id), backend))
940            }),
941        ) {
942            match res {
943                DiffResult::Added => {
944                    let backend = other
945                        .backends
946                        .get(cluster_id)
947                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
948                        .unwrap();
949                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
950                }
951                DiffResult::Removed => {
952                    let backend = self
953                        .backends
954                        .get(cluster_id)
955                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
956                        .unwrap();
957
958                    v.push(
959                        RequestType::RemoveBackend(RemoveBackend {
960                            cluster_id: backend.cluster_id.clone(),
961                            backend_id: backend.backend_id.clone(),
962                            address: SocketAddress::from(backend.address),
963                        })
964                        .into(),
965                    );
966                }
967                DiffResult::Changed => {
968                    let backend = self
969                        .backends
970                        .get(cluster_id)
971                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
972                        .unwrap();
973
974                    v.push(
975                        RequestType::RemoveBackend(RemoveBackend {
976                            cluster_id: backend.cluster_id.clone(),
977                            backend_id: backend.backend_id.clone(),
978                            address: SocketAddress::from(backend.address),
979                        })
980                        .into(),
981                    );
982
983                    let backend = other
984                        .backends
985                        .get(cluster_id)
986                        .and_then(|v| v.iter().find(|b| &b.backend_id == backend_id))
987                        .unwrap();
988                    v.push(RequestType::AddBackend(backend.clone().to_add_backend()).into());
989                }
990            }
991        }
992
993        let mut my_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
994        for (route, front) in self.http_fronts.iter() {
995            my_http_fronts.insert((route, front));
996        }
997        let mut their_http_fronts: HashSet<(&str, &HttpFrontend)> = HashSet::new();
998        for (route, front) in other.http_fronts.iter() {
999            their_http_fronts.insert((route, front));
1000        }
1001
1002        let removed_http_fronts = my_http_fronts.difference(&their_http_fronts);
1003        let added_http_fronts = their_http_fronts.difference(&my_http_fronts);
1004
1005        for &(_, front) in removed_http_fronts {
1006            v.push(RequestType::RemoveHttpFrontend(front.clone().into()).into());
1007        }
1008
1009        for &(_, front) in added_http_fronts {
1010            v.push(RequestType::AddHttpFrontend(front.clone().into()).into());
1011        }
1012
1013        let mut my_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1014        for (route, front) in self.https_fronts.iter() {
1015            my_https_fronts.insert((route, front));
1016        }
1017        let mut their_https_fronts: HashSet<(&String, &HttpFrontend)> = HashSet::new();
1018        for (route, front) in other.https_fronts.iter() {
1019            their_https_fronts.insert((route, front));
1020        }
1021        let removed_https_fronts = my_https_fronts.difference(&their_https_fronts);
1022        let added_https_fronts = their_https_fronts.difference(&my_https_fronts);
1023
1024        for &(_, front) in removed_https_fronts {
1025            v.push(RequestType::RemoveHttpsFrontend(front.clone().into()).into());
1026        }
1027
1028        for &(_, front) in added_https_fronts {
1029            v.push(RequestType::AddHttpsFrontend(front.clone().into()).into());
1030        }
1031
1032        let mut my_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1033        for (cluster_id, front_list) in self.tcp_fronts.iter() {
1034            for front in front_list.iter() {
1035                my_tcp_fronts.insert((cluster_id, front));
1036            }
1037        }
1038        let mut their_tcp_fronts: HashSet<(&ClusterId, &TcpFrontend)> = HashSet::new();
1039        for (cluster_id, front_list) in other.tcp_fronts.iter() {
1040            for front in front_list.iter() {
1041                their_tcp_fronts.insert((cluster_id, front));
1042            }
1043        }
1044
1045        let removed_tcp_fronts = my_tcp_fronts.difference(&their_tcp_fronts);
1046        let added_tcp_fronts = their_tcp_fronts.difference(&my_tcp_fronts);
1047
1048        for &(_, front) in removed_tcp_fronts {
1049            v.push(RequestType::RemoveTcpFrontend(front.clone().into()).into());
1050        }
1051
1052        for &(_, front) in added_tcp_fronts {
1053            v.push(RequestType::AddTcpFrontend(front.clone().into()).into());
1054        }
1055
1056        //pub certificates:    HashMap<SocketAddr, HashMap<CertificateFingerprint, (CertificateAndKey, Vec<String>)>>,
1057        let my_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1058            self.certificates
1059                .iter()
1060                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1061        );
1062        let their_certificates: HashSet<(SocketAddr, &Fingerprint)> = HashSet::from_iter(
1063            other
1064                .certificates
1065                .iter()
1066                .flat_map(|(addr, certs)| repeat(*addr).zip(certs.keys())),
1067        );
1068
1069        let removed_certificates = my_certificates.difference(&their_certificates);
1070        let added_certificates = their_certificates.difference(&my_certificates);
1071
1072        for &(address, fingerprint) in removed_certificates {
1073            v.push(
1074                RequestType::RemoveCertificate(RemoveCertificate {
1075                    address: SocketAddress::from(address),
1076                    fingerprint: fingerprint.to_string(),
1077                })
1078                .into(),
1079            );
1080        }
1081
1082        for &(address, fingerprint) in added_certificates {
1083            if let Some(certificate_and_key) = other
1084                .certificates
1085                .get(&address)
1086                .and_then(|certs| certs.get(fingerprint))
1087            {
1088                v.push(
1089                    RequestType::AddCertificate(AddCertificate {
1090                        address: SocketAddress::from(address),
1091                        certificate: certificate_and_key.clone(),
1092                        expired_at: None,
1093                    })
1094                    .into(),
1095                );
1096            }
1097        }
1098
1099        for address in added_tcp_listeners {
1100            let listener = &other.tcp_listeners[*address];
1101            if listener.active {
1102                v.push(
1103                    RequestType::ActivateListener(ActivateListener {
1104                        address: listener.address,
1105                        proxy: ListenerType::Tcp.into(),
1106                        from_scm: false,
1107                    })
1108                    .into(),
1109                );
1110            }
1111        }
1112
1113        v
1114    }
1115
1116    // FIXME: what about deny rules?
1117    pub fn hash_state(&self) -> BTreeMap<ClusterId, u64> {
1118        let mut hm: HashMap<ClusterId, DefaultHasher> = self
1119            .clusters
1120            .keys()
1121            .map(|cluster_id| {
1122                let mut hasher = DefaultHasher::new();
1123                self.clusters.get(cluster_id).hash(&mut hasher);
1124                if let Some(backends) = self.backends.get(cluster_id) {
1125                    backends.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1126                }
1127                if let Some(tcp_fronts) = self.tcp_fronts.get(cluster_id) {
1128                    tcp_fronts.iter().collect::<BTreeSet<_>>().hash(&mut hasher)
1129                }
1130                (cluster_id.to_owned(), hasher)
1131            })
1132            .collect();
1133
1134        for front in self.http_fronts.values() {
1135            if let Some(cluster_id) = &front.cluster_id {
1136                if let Some(hasher) = hm.get_mut(cluster_id) {
1137                    front.hash(hasher);
1138                }
1139            }
1140        }
1141
1142        for front in self.https_fronts.values() {
1143            if let Some(cluster_id) = &front.cluster_id {
1144                if let Some(hasher) = hm.get_mut(cluster_id) {
1145                    front.hash(hasher);
1146                }
1147            }
1148        }
1149
1150        hm.drain()
1151            .map(|(cluster_id, hasher)| (cluster_id, hasher.finish()))
1152            .collect()
1153    }
1154
1155    /// Gives details about a given cluster.
1156    /// Types like `HttpFrontend` are converted into protobuf ones, like `RequestHttpFrontend`
1157    pub fn cluster_state(&self, cluster_id: &str) -> Option<ClusterInformation> {
1158        let configuration = self.clusters.get(cluster_id).cloned()?;
1159        info!("{:?}", configuration);
1160
1161        let http_frontends: Vec<RequestHttpFrontend> = self
1162            .http_fronts
1163            .values()
1164            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1165            .map(|front| front.clone().into())
1166            .collect();
1167
1168        let https_frontends: Vec<RequestHttpFrontend> = self
1169            .https_fronts
1170            .values()
1171            .filter(|front| front.cluster_id.as_deref() == Some(cluster_id))
1172            .map(|front| front.clone().into())
1173            .collect();
1174
1175        let tcp_frontends: Vec<RequestTcpFrontend> = self
1176            .tcp_fronts
1177            .get(cluster_id)
1178            .cloned()
1179            .unwrap_or_default()
1180            .iter()
1181            .map(|front| front.clone().into())
1182            .collect();
1183
1184        let backends: Vec<AddBackend> = self
1185            .backends
1186            .get(cluster_id)
1187            .cloned()
1188            .unwrap_or_default()
1189            .iter()
1190            .map(|backend| backend.clone().into())
1191            .collect();
1192
1193        Some(ClusterInformation {
1194            configuration: Some(configuration),
1195            http_frontends,
1196            https_frontends,
1197            tcp_frontends,
1198            backends,
1199        })
1200    }
1201
1202    pub fn count_backends(&self) -> usize {
1203        self.backends.values().fold(0, |acc, v| acc + v.len())
1204    }
1205
1206    pub fn count_frontends(&self) -> usize {
1207        self.http_fronts.values().count()
1208            + self.https_fronts.values().count()
1209            + self.tcp_fronts.values().fold(0, |acc, v| acc + v.len())
1210    }
1211
1212    pub fn get_cluster_ids_by_domain(
1213        &self,
1214        hostname: String,
1215        path: Option<String>,
1216    ) -> HashSet<ClusterId> {
1217        let mut cluster_ids: HashSet<ClusterId> = HashSet::new();
1218
1219        self.http_fronts.values().for_each(|front| {
1220            if domain_check(&front.hostname, &front.path, &hostname, &path) {
1221                if let Some(id) = &front.cluster_id {
1222                    cluster_ids.insert(id.to_string());
1223                }
1224            }
1225        });
1226
1227        self.https_fronts.values().for_each(|front| {
1228            if domain_check(&front.hostname, &front.path, &hostname, &path) {
1229                if let Some(id) = &front.cluster_id {
1230                    cluster_ids.insert(id.to_string());
1231                }
1232            }
1233        });
1234
1235        cluster_ids
1236    }
1237
1238    pub fn get_certificates(
1239        &self,
1240        filters: QueryCertificatesFilters,
1241    ) -> BTreeMap<String, CertificateAndKey> {
1242        self.certificates
1243            .values()
1244            .flat_map(|hash_map| hash_map.iter())
1245            .filter(|(fingerprint, cert)| {
1246                if let Some(domain) = &filters.domain {
1247                    cert.names.contains(domain)
1248                } else if let Some(f) = &filters.fingerprint {
1249                    fingerprint.to_string() == *f
1250                } else {
1251                    true
1252                }
1253            })
1254            .map(|(fingerprint, cert)| (fingerprint.to_string(), cert.to_owned()))
1255            .collect()
1256    }
1257
1258    pub fn list_frontends(&self, filters: FrontendFilters) -> ListedFrontends {
1259        // if no http / https / tcp filter is provided, list all of them
1260        let list_all = !filters.http && !filters.https && !filters.tcp;
1261
1262        let mut listed_frontends = ListedFrontends::default();
1263
1264        if filters.http || list_all {
1265            for http_frontend in self.http_fronts.iter().filter(|f| {
1266                if let Some(domain) = &filters.domain {
1267                    f.1.hostname.contains(domain)
1268                } else {
1269                    true
1270                }
1271            }) {
1272                listed_frontends
1273                    .http_frontends
1274                    .push(http_frontend.1.to_owned().into());
1275            }
1276        }
1277
1278        if filters.https || list_all {
1279            for https_frontend in self.https_fronts.iter().filter(|f| {
1280                if let Some(domain) = &filters.domain {
1281                    f.1.hostname.contains(domain)
1282                } else {
1283                    true
1284                }
1285            }) {
1286                listed_frontends
1287                    .https_frontends
1288                    .push(https_frontend.1.to_owned().into());
1289            }
1290        }
1291
1292        if (filters.tcp || list_all) && filters.domain.is_none() {
1293            for tcp_frontend in self.tcp_fronts.values().flat_map(|v| v.iter()) {
1294                listed_frontends
1295                    .tcp_frontends
1296                    .push(tcp_frontend.to_owned().into())
1297            }
1298        }
1299
1300        listed_frontends
1301    }
1302
1303    pub fn list_listeners(&self) -> ListenersList {
1304        ListenersList {
1305            http_listeners: self
1306                .http_listeners
1307                .iter()
1308                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1309                .collect(),
1310            https_listeners: self
1311                .https_listeners
1312                .iter()
1313                .map(|(addr, listener)| (addr.to_string(), listener.clone()))
1314                .collect(),
1315            tcp_listeners: self
1316                .tcp_listeners
1317                .iter()
1318                .map(|(addr, listener)| (addr.to_string(), *listener))
1319                .collect(),
1320        }
1321    }
1322
1323    // create requests needed for a worker to recreate the state
1324    pub fn produce_initial_state(&self) -> InitialState {
1325        let mut worker_requests = Vec::new();
1326        for (counter, request) in self.generate_requests().into_iter().enumerate() {
1327            worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
1328        }
1329        InitialState {
1330            requests: worker_requests,
1331        }
1332    }
1333
1334    /// generate requests necessary to recreate the state,
1335    /// in protobuf, to a temp file
1336    pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1337        let initial_state = self.produce_initial_state();
1338        let count = initial_state.requests.len();
1339
1340        let bytes_to_write = initial_state.encode_to_vec();
1341        println!("writing {} in the temp file", bytes_to_write.len());
1342        file.write_all(&bytes_to_write)
1343            .map_err(StateError::FileError)?;
1344
1345        file.sync_all().map_err(StateError::FileError)?;
1346
1347        Ok(count)
1348    }
1349
1350    /// generate requests necessary to recreate the state,
1351    /// write them in a JSON form in a file, separated by \n\0,
1352    /// returns the number of written requests
1353    pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
1354        let mut counter = 0usize;
1355        let requests = self.generate_requests();
1356
1357        for request in requests {
1358            let message = WorkerRequest::new(format!("SAVE-{counter}"), request);
1359
1360            file.write_all(
1361                &serde_json::to_string(&message)
1362                    .map(|s| s.into_bytes())
1363                    .unwrap_or_default(),
1364            )
1365            .map_err(StateError::FileError)?;
1366
1367            file.write_all(&b"\n\0"[..])
1368                .map_err(StateError::FileError)?;
1369
1370            if counter % 1000 == 0 {
1371                info!("writing {} commands to file", counter);
1372                file.sync_all().map_err(StateError::FileError)?;
1373            }
1374            counter += 1;
1375        }
1376        file.sync_all().map_err(StateError::FileError)?;
1377
1378        Ok(counter)
1379    }
1380}
1381
1382fn domain_check(
1383    front_hostname: &str,
1384    front_path_rule: &PathRule,
1385    hostname: &str,
1386    path_prefix: &Option<String>,
1387) -> bool {
1388    if hostname != front_hostname {
1389        return false;
1390    }
1391
1392    if let Some(path) = &path_prefix {
1393        return path == &front_path_rule.value;
1394    }
1395
1396    true
1397}
1398
1399struct DiffMap<'a, K: Ord, V, I1, I2> {
1400    my_it: I1,
1401    other_it: I2,
1402    my: Option<(K, &'a V)>,
1403    other: Option<(K, &'a V)>,
1404}
1405
1406//fn diff_map<'a, K:Ord, V: PartialEq>(my: &'a BTreeMap<K,V>, other: &'a BTreeMap<K,V>) -> DiffMap<'a,K,V> {
1407fn diff_map<
1408    'a,
1409    K: Ord,
1410    V: PartialEq,
1411    I1: Iterator<Item = (K, &'a V)>,
1412    I2: Iterator<Item = (K, &'a V)>,
1413>(
1414    my: I1,
1415    other: I2,
1416) -> DiffMap<'a, K, V, I1, I2> {
1417    DiffMap {
1418        my_it: my,
1419        other_it: other,
1420        my: None,
1421        other: None,
1422    }
1423}
1424
1425enum DiffResult {
1426    Added,
1427    Removed,
1428    Changed,
1429}
1430
1431// this will iterate over the keys of both iterators
1432// since keys are sorted, it should be easy to see which ones are in common or not
1433impl<'a, K: Ord, V: PartialEq, I1: Iterator<Item = (K, &'a V)>, I2: Iterator<Item = (K, &'a V)>>
1434    std::iter::Iterator for DiffMap<'a, K, V, I1, I2>
1435{
1436    type Item = (K, DiffResult);
1437
1438    fn next(&mut self) -> Option<Self::Item> {
1439        loop {
1440            if self.my.is_none() {
1441                self.my = self.my_it.next();
1442            }
1443            if self.other.is_none() {
1444                self.other = self.other_it.next();
1445            }
1446
1447            match (self.my.take(), self.other.take()) {
1448                // there are no more elements in my_it, all the next elements in other
1449                // should be added
1450                // if other was none, we will stop the iterator there
1451                (None, other) => return other.map(|(k, _)| (k, DiffResult::Added)),
1452                // there are no more elements in other_it, all the next elements in my
1453                // should be removed
1454                (Some((k, _)), None) => return Some((k, DiffResult::Removed)),
1455                // element is present in my but not other
1456                (Some((k1, _v1)), Some((k2, v2))) if k1 < k2 => {
1457                    self.other = Some((k2, v2));
1458                    return Some((k1, DiffResult::Removed));
1459                }
1460                // element is present in other byt not in my
1461                (Some((k1, v1)), Some((k2, _v2))) if k1 > k2 => {
1462                    self.my = Some((k1, v1));
1463                    return Some((k2, DiffResult::Added));
1464                }
1465                (Some((k1, v1)), Some((_k2, v2))) if v1 != v2 => {
1466                    // key is present in both, if elements have changed
1467                    // return a value, otherwise go to the next key for both maps
1468                    return Some((k1, DiffResult::Changed));
1469                }
1470                _ => {}
1471            }
1472        }
1473    }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478    use rand::{Rng, rng, seq::SliceRandom};
1479
1480    use super::*;
1481    use crate::proto::command::{
1482        CustomHttpAnswers, LoadBalancingParams, RequestHttpFrontend, RulePosition,
1483    };
1484
1485    #[test]
1486    fn serialize() {
1487        let mut state: ConfigState = Default::default();
1488        state
1489            .dispatch(
1490                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1491                    cluster_id: Some(String::from("cluster_1")),
1492                    hostname: String::from("lolcatho.st:8080"),
1493                    path: PathRule::prefix(String::from("/")),
1494                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1495                    position: RulePosition::Tree.into(),
1496                    ..Default::default()
1497                })
1498                .into(),
1499            )
1500            .expect("Could not execute request");
1501        state
1502            .dispatch(
1503                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1504                    cluster_id: Some(String::from("cluster_2")),
1505                    hostname: String::from("test.local"),
1506                    path: PathRule::prefix(String::from("/abc")),
1507                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1508                    position: RulePosition::Pre.into(),
1509                    ..Default::default()
1510                })
1511                .into(),
1512            )
1513            .expect("Could not execute request");
1514        state
1515            .dispatch(
1516                &RequestType::AddBackend(AddBackend {
1517                    cluster_id: String::from("cluster_1"),
1518                    backend_id: String::from("cluster_1-0"),
1519                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1520                    ..Default::default()
1521                })
1522                .into(),
1523            )
1524            .expect("Could not execute request");
1525        state
1526            .dispatch(
1527                &RequestType::AddBackend(AddBackend {
1528                    cluster_id: String::from("cluster_1"),
1529                    backend_id: String::from("cluster_1-1"),
1530                    address: SocketAddress::new_v4(127, 0, 0, 1, 1027),
1531                    ..Default::default()
1532                })
1533                .into(),
1534            )
1535            .expect("Could not execute request");
1536        state
1537            .dispatch(
1538                &RequestType::AddBackend(AddBackend {
1539                    cluster_id: String::from("cluster_2"),
1540                    backend_id: String::from("cluster_2-0"),
1541                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
1542                    ..Default::default()
1543                })
1544                .into(),
1545            )
1546            .expect("Could not execute request");
1547        state
1548            .dispatch(
1549                &RequestType::AddBackend(AddBackend {
1550                    cluster_id: String::from("cluster_1"),
1551                    backend_id: String::from("cluster_1-3"),
1552                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
1553                    ..Default::default()
1554                })
1555                .into(),
1556            )
1557            .expect("Could not execute request");
1558        state
1559            .dispatch(
1560                &RequestType::RemoveBackend(RemoveBackend {
1561                    cluster_id: String::from("cluster_1"),
1562                    backend_id: String::from("cluster_1-3"),
1563                    address: SocketAddress::new_v4(192, 168, 1, 3, 1027),
1564                })
1565                .into(),
1566            )
1567            .expect("Could not execute request");
1568
1569        /*
1570        let encoded = state.encode();
1571        println!("serialized:\n{}", encoded);
1572
1573        let new_state: Option<HttpProxy> = decode_str(&encoded);
1574        println!("deserialized:\n{:?}", new_state);
1575        assert_eq!(new_state, Some(state));
1576        */
1577        //assert!(false);
1578    }
1579
1580    #[test]
1581    fn diff() {
1582        let mut state: ConfigState = Default::default();
1583        state
1584            .dispatch(
1585                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1586                    cluster_id: Some(String::from("cluster_1")),
1587                    hostname: String::from("lolcatho.st:8080"),
1588                    path: PathRule::prefix(String::from("/")),
1589                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1590                    position: RulePosition::Post.into(),
1591                    ..Default::default()
1592                })
1593                .into(),
1594            )
1595            .expect("Could not execute request");
1596        state
1597            .dispatch(
1598                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1599                    cluster_id: Some(String::from("cluster_2")),
1600                    hostname: String::from("test.local"),
1601                    path: PathRule::prefix(String::from("/abc")),
1602                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1603                    ..Default::default()
1604                })
1605                .into(),
1606            )
1607            .expect("Could not execute request");
1608        state
1609            .dispatch(
1610                &RequestType::AddBackend(AddBackend {
1611                    cluster_id: String::from("cluster_1"),
1612                    backend_id: String::from("cluster_1-0"),
1613                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1614                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1615                    ..Default::default()
1616                })
1617                .into(),
1618            )
1619            .expect("Could not execute request");
1620        state
1621            .dispatch(
1622                &RequestType::AddBackend(AddBackend {
1623                    cluster_id: String::from("cluster_1"),
1624                    backend_id: String::from("cluster_1-1"),
1625                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
1626                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1627                    ..Default::default()
1628                })
1629                .into(),
1630            )
1631            .expect("Could not execute request");
1632        state
1633            .dispatch(
1634                &RequestType::AddBackend(AddBackend {
1635                    cluster_id: String::from("cluster_2"),
1636                    backend_id: String::from("cluster_2-0"),
1637                    address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
1638                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1639                    ..Default::default()
1640                })
1641                .into(),
1642            )
1643            .expect("Could not execute request");
1644        state
1645            .dispatch(
1646                &RequestType::AddCluster(Cluster {
1647                    cluster_id: String::from("cluster_2"),
1648                    sticky_session: true,
1649                    https_redirect: true,
1650                    ..Default::default()
1651                })
1652                .into(),
1653            )
1654            .expect("Could not execute request");
1655
1656        let mut state2: ConfigState = Default::default();
1657        state2
1658            .dispatch(
1659                &RequestType::AddHttpFrontend(RequestHttpFrontend {
1660                    cluster_id: Some(String::from("cluster_1")),
1661                    hostname: String::from("lolcatho.st:8080"),
1662                    path: PathRule::prefix(String::from("/")),
1663                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1664                    position: RulePosition::Post.into(),
1665                    ..Default::default()
1666                })
1667                .into(),
1668            )
1669            .expect("Could not execute request");
1670        state2
1671            .dispatch(
1672                &RequestType::AddBackend(AddBackend {
1673                    cluster_id: String::from("cluster_1"),
1674                    backend_id: String::from("cluster_1-0"),
1675                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1676                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1677                    ..Default::default()
1678                })
1679                .into(),
1680            )
1681            .expect("Could not execute request");
1682        state2
1683            .dispatch(
1684                &RequestType::AddBackend(AddBackend {
1685                    cluster_id: String::from("cluster_1"),
1686                    backend_id: String::from("cluster_1-1"),
1687                    address: SocketAddress::new_v4(127, 0, 0, 2, 1027),
1688                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1689                    ..Default::default()
1690                })
1691                .into(),
1692            )
1693            .expect("Could not execute request");
1694        state2
1695            .dispatch(
1696                &RequestType::AddBackend(AddBackend {
1697                    cluster_id: String::from("cluster_1"),
1698                    backend_id: String::from("cluster_1-2"),
1699                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
1700                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1701                    ..Default::default()
1702                })
1703                .into(),
1704            )
1705            .expect("Could not execute request");
1706        state2
1707            .dispatch(
1708                &RequestType::AddCluster(Cluster {
1709                    cluster_id: String::from("cluster_3"),
1710                    sticky_session: false,
1711                    https_redirect: false,
1712                    ..Default::default()
1713                })
1714                .into(),
1715            )
1716            .expect("Could not execute request");
1717
1718        let e: Vec<Request> = vec![
1719            RequestType::RemoveHttpFrontend(RequestHttpFrontend {
1720                cluster_id: Some(String::from("cluster_2")),
1721                hostname: String::from("test.local"),
1722                path: PathRule::prefix(String::from("/abc")),
1723                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1724                ..Default::default()
1725            })
1726            .into(),
1727            RequestType::RemoveBackend(RemoveBackend {
1728                cluster_id: String::from("cluster_2"),
1729                backend_id: String::from("cluster_2-0"),
1730                address: SocketAddress::new_v4(192, 167, 1, 2, 1026),
1731            })
1732            .into(),
1733            RequestType::AddBackend(AddBackend {
1734                cluster_id: String::from("cluster_1"),
1735                backend_id: String::from("cluster_1-2"),
1736                address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
1737                load_balancing_parameters: Some(LoadBalancingParams::default()),
1738                ..Default::default()
1739            })
1740            .into(),
1741            RequestType::RemoveCluster(String::from("cluster_2")).into(),
1742            RequestType::AddCluster(Cluster {
1743                cluster_id: String::from("cluster_3"),
1744                sticky_session: false,
1745                https_redirect: false,
1746                ..Default::default()
1747            })
1748            .into(),
1749        ];
1750        let expected_diff: HashSet<&Request> = HashSet::from_iter(e.iter());
1751
1752        let d = state.diff(&state2);
1753        let diff = HashSet::from_iter(d.iter());
1754        println!("diff requests:\n{diff:#?}\n");
1755        println!("expected diff requests:\n{expected_diff:#?}\n");
1756
1757        let hash1 = state.hash_state();
1758        let hash2 = state2.hash_state();
1759        let mut state3 = state.clone();
1760        state3
1761            .dispatch(
1762                &RequestType::AddBackend(AddBackend {
1763                    cluster_id: String::from("cluster_1"),
1764                    backend_id: String::from("cluster_1-2"),
1765                    address: SocketAddress::new_v4(127, 0, 0, 2, 1028),
1766                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1767                    ..Default::default()
1768                })
1769                .into(),
1770            )
1771            .expect("Could not execute request");
1772        let hash3 = state3.hash_state();
1773        println!("state 1 hashes: {hash1:#?}");
1774        println!("state 2 hashes: {hash2:#?}");
1775        println!("state 3 hashes: {hash3:#?}");
1776
1777        assert_eq!(diff, expected_diff);
1778    }
1779
1780    #[test]
1781    fn cluster_ids_by_domain() {
1782        let mut config = ConfigState::new();
1783        let http_front_cluster1 = RequestHttpFrontend {
1784            cluster_id: Some(String::from("MyCluster_1")),
1785            hostname: String::from("lolcatho.st"),
1786            path: PathRule::prefix(String::from("")),
1787            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1788            ..Default::default()
1789        };
1790
1791        let https_front_cluster1 = RequestHttpFrontend {
1792            cluster_id: Some(String::from("MyCluster_1")),
1793            hostname: String::from("lolcatho.st"),
1794            path: PathRule::prefix(String::from("")),
1795            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
1796            ..Default::default()
1797        };
1798
1799        let http_front_cluster2 = RequestHttpFrontend {
1800            cluster_id: Some(String::from("MyCluster_2")),
1801            hostname: String::from("lolcatho.st"),
1802            path: PathRule::prefix(String::from("/api")),
1803            address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
1804            ..Default::default()
1805        };
1806
1807        let https_front_cluster2 = RequestHttpFrontend {
1808            cluster_id: Some(String::from("MyCluster_2")),
1809            hostname: String::from("lolcatho.st"),
1810            path: PathRule::prefix(String::from("/api")),
1811            address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
1812            ..Default::default()
1813        };
1814
1815        config
1816            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster1).into())
1817            .expect("Could not execute request");
1818        config
1819            .dispatch(&RequestType::AddHttpFrontend(http_front_cluster2).into())
1820            .expect("Could not execute request");
1821        config
1822            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster1).into())
1823            .expect("Could not execute request");
1824        config
1825            .dispatch(&RequestType::AddHttpsFrontend(https_front_cluster2).into())
1826            .expect("Could not execute request");
1827
1828        let mut cluster1_cluster2: HashSet<ClusterId> = HashSet::new();
1829        cluster1_cluster2.insert(String::from("MyCluster_1"));
1830        cluster1_cluster2.insert(String::from("MyCluster_2"));
1831
1832        let mut cluster2: HashSet<ClusterId> = HashSet::new();
1833        cluster2.insert(String::from("MyCluster_2"));
1834
1835        let empty: HashSet<ClusterId> = HashSet::new();
1836        assert_eq!(
1837            config.get_cluster_ids_by_domain(String::from("lolcatho.st"), None),
1838            cluster1_cluster2
1839        );
1840        assert_eq!(
1841            config
1842                .get_cluster_ids_by_domain(String::from("lolcatho.st"), Some(String::from("/api"))),
1843            cluster2
1844        );
1845        assert_eq!(
1846            config.get_cluster_ids_by_domain(String::from("lolcathost"), None),
1847            empty
1848        );
1849        assert_eq!(
1850            config
1851                .get_cluster_ids_by_domain(String::from("lolcathost"), Some(String::from("/sozu"))),
1852            empty
1853        );
1854    }
1855
1856    #[test]
1857    fn duplicate_backends() {
1858        let mut state: ConfigState = Default::default();
1859        state
1860            .dispatch(
1861                &RequestType::AddBackend(AddBackend {
1862                    cluster_id: String::from("cluster_1"),
1863                    backend_id: String::from("cluster_1-0"),
1864                    address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1865                    load_balancing_parameters: Some(LoadBalancingParams::default()),
1866                    ..Default::default()
1867                })
1868                .into(),
1869            )
1870            .expect("Could not execute request");
1871
1872        let b = Backend {
1873            cluster_id: String::from("cluster_1"),
1874            backend_id: String::from("cluster_1-0"),
1875            address: "127.0.0.1:1026".parse().unwrap(),
1876            load_balancing_parameters: Some(LoadBalancingParams::default()),
1877            sticky_id: Some("sticky".to_string()),
1878            backup: None,
1879        };
1880
1881        state
1882            .dispatch(&RequestType::AddBackend(b.clone().to_add_backend()).into())
1883            .expect("Could not execute order");
1884
1885        assert_eq!(state.backends.get("cluster_1").unwrap(), &vec![b]);
1886    }
1887
1888    #[test]
1889    fn remove_backend() {
1890        let mut state: ConfigState = Default::default();
1891        state
1892            .dispatch(
1893                &RequestType::AddCluster(Cluster {
1894                    cluster_id: String::from("cluster_1"),
1895                    ..Default::default()
1896                })
1897                .into(),
1898            )
1899            .expect("Could not execute request");
1900
1901        for i in 0..10 {
1902            state
1903                .dispatch(
1904                    &RequestType::AddBackend(AddBackend {
1905                        cluster_id: String::from("cluster_1"),
1906                        backend_id: format!("cluster_1-{i}"),
1907                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1908                        ..Default::default()
1909                    })
1910                    .into(),
1911                )
1912                .expect("Could not execute request");
1913        }
1914
1915        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 10);
1916
1917        let remove_backend_2 = RequestType::RemoveBackend(RemoveBackend {
1918            cluster_id: String::from("cluster_1"),
1919            backend_id: String::from("cluster_1-0"),
1920            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1921        })
1922        .into();
1923
1924        let remove_backend_result = state.dispatch(&remove_backend_2);
1925
1926        assert!(remove_backend_result.is_ok());
1927        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
1928
1929        let redundant_remove = state.dispatch(&remove_backend_2);
1930        assert!(matches!(redundant_remove, Err(StateError::NoChange)));
1931        assert_eq!(state.backends.get("cluster_1").unwrap().len(), 9);
1932    }
1933
1934    #[test]
1935    fn remove_backends_randomly() {
1936        let mut state: ConfigState = Default::default();
1937        state
1938            .dispatch(
1939                &RequestType::AddCluster(Cluster {
1940                    cluster_id: String::from("cluster_1"),
1941                    ..Default::default()
1942                })
1943                .into(),
1944            )
1945            .expect("Could not execute request");
1946
1947        for _ in 0..1000 {
1948            for i in 0..10 {
1949                state
1950                    .dispatch(
1951                        &RequestType::AddBackend(AddBackend {
1952                            cluster_id: String::from("cluster_1"),
1953                            backend_id: format!("cluster_1-{i}"),
1954                            address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1955                            ..Default::default()
1956                        })
1957                        .into(),
1958                    )
1959                    .expect("Could not execute request");
1960            }
1961
1962            let mut rng = rng();
1963            let mut indexes = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
1964            indexes.shuffle(&mut rng);
1965            let random_count = rng.random_range(1..indexes.len());
1966            let random_indexes: Vec<i32> = indexes.into_iter().take(random_count).collect();
1967
1968            for j in random_indexes {
1969                let remove_backend_result = state.dispatch(
1970                    &RequestType::RemoveBackend(RemoveBackend {
1971                        cluster_id: String::from("cluster_1"),
1972                        backend_id: format!("cluster_1-{j}"),
1973                        address: SocketAddress::new_v4(127, 0, 0, 1, 1026),
1974                    })
1975                    .into(),
1976                );
1977                assert!(remove_backend_result.is_ok());
1978            }
1979        }
1980    }
1981
1982    #[test]
1983    fn listener_diff() {
1984        let mut state: ConfigState = Default::default();
1985        let custom_http_answers = Some(CustomHttpAnswers {
1986            answer_404: Some("test".to_string()),
1987            ..Default::default()
1988        });
1989        state
1990            .dispatch(
1991                &RequestType::AddTcpListener(TcpListenerConfig {
1992                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
1993                    ..Default::default()
1994                })
1995                .into(),
1996            )
1997            .expect("Could not execute request");
1998        state
1999            .dispatch(
2000                &RequestType::ActivateListener(ActivateListener {
2001                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2002                    proxy: ListenerType::Tcp.into(),
2003                    from_scm: false,
2004                })
2005                .into(),
2006            )
2007            .expect("Could not execute request");
2008        state
2009            .dispatch(
2010                &RequestType::AddHttpListener(HttpListenerConfig {
2011                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2012                    ..Default::default()
2013                })
2014                .into(),
2015            )
2016            .expect("Could not execute request");
2017        state
2018            .dispatch(
2019                &RequestType::AddHttpsListener(HttpsListenerConfig {
2020                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2021                    ..Default::default()
2022                })
2023                .into(),
2024            )
2025            .expect("Could not execute request");
2026        state
2027            .dispatch(
2028                &RequestType::ActivateListener(ActivateListener {
2029                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2030                    proxy: ListenerType::Https.into(),
2031                    from_scm: false,
2032                })
2033                .into(),
2034            )
2035            .expect("Could not execute request");
2036
2037        let mut state2: ConfigState = Default::default();
2038        state2
2039            .dispatch(
2040                &RequestType::AddTcpListener(TcpListenerConfig {
2041                    address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2042                    expect_proxy: true,
2043                    ..Default::default()
2044                })
2045                .into(),
2046            )
2047            .expect("Could not execute request");
2048        state2
2049            .dispatch(
2050                &RequestType::AddHttpListener(HttpListenerConfig {
2051                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2052                    http_answers: custom_http_answers.clone(),
2053                    ..Default::default()
2054                })
2055                .into(),
2056            )
2057            .expect("Could not execute request");
2058        state2
2059            .dispatch(
2060                &RequestType::ActivateListener(ActivateListener {
2061                    address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2062                    proxy: ListenerType::Http.into(),
2063                    from_scm: false,
2064                })
2065                .into(),
2066            )
2067            .expect("Could not execute request");
2068        state2
2069            .dispatch(
2070                &RequestType::AddHttpsListener(HttpsListenerConfig {
2071                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2072                    http_answers: custom_http_answers.clone(),
2073                    ..Default::default()
2074                })
2075                .into(),
2076            )
2077            .expect("Could not execute request");
2078        state2
2079            .dispatch(
2080                &RequestType::ActivateListener(ActivateListener {
2081                    address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2082                    proxy: ListenerType::Https.into(),
2083                    from_scm: false,
2084                })
2085                .into(),
2086            )
2087            .expect("Could not execute request");
2088
2089        let e: Vec<Request> = vec![
2090            RequestType::RemoveListener(RemoveListener {
2091                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2092                proxy: ListenerType::Tcp.into(),
2093            })
2094            .into(),
2095            RequestType::AddTcpListener(TcpListenerConfig {
2096                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2097                expect_proxy: true,
2098                ..Default::default()
2099            })
2100            .into(),
2101            RequestType::DeactivateListener(DeactivateListener {
2102                address: SocketAddress::new_v4(0, 0, 0, 0, 1234),
2103                proxy: ListenerType::Tcp.into(),
2104                to_scm: false,
2105            })
2106            .into(),
2107            RequestType::RemoveListener(RemoveListener {
2108                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2109                proxy: ListenerType::Http.into(),
2110            })
2111            .into(),
2112            RequestType::AddHttpListener(HttpListenerConfig {
2113                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2114                http_answers: custom_http_answers.clone(),
2115                ..Default::default()
2116            })
2117            .into(),
2118            RequestType::ActivateListener(ActivateListener {
2119                address: SocketAddress::new_v4(0, 0, 0, 0, 8080),
2120                proxy: ListenerType::Http.into(),
2121                from_scm: false,
2122            })
2123            .into(),
2124            RequestType::RemoveListener(RemoveListener {
2125                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2126                proxy: ListenerType::Https.into(),
2127            })
2128            .into(),
2129            RequestType::AddHttpsListener(HttpsListenerConfig {
2130                address: SocketAddress::new_v4(0, 0, 0, 0, 8443),
2131                http_answers: custom_http_answers.clone(),
2132                ..Default::default()
2133            })
2134            .into(),
2135        ];
2136
2137        let diff = state.diff(&state2);
2138        //let diff: HashSet<&RequestContent> = HashSet::from_iter(d.iter());
2139        println!("expected diff requests:\n{e:#?}\n");
2140        println!("diff requests:\n{diff:#?}\n");
2141
2142        let _hash1 = state.hash_state();
2143        let _hash2 = state2.hash_state();
2144
2145        assert_eq!(diff, e);
2146    }
2147
2148    #[test]
2149    fn certificate_retrieval() {
2150        let mut state: ConfigState = Default::default();
2151        let certificate_and_key = CertificateAndKey {
2152            certificate: String::from(include_str!("../assets/certificate.pem")),
2153            key: String::from(include_str!("../assets/key.pem")),
2154            certificate_chain: vec![],
2155            versions: vec![],
2156            names: vec!["lolcatho.st".to_string()],
2157        };
2158        let add_certificate = AddCertificate {
2159            address: SocketAddress::new_v4(127, 0, 0, 1, 8080),
2160            certificate: certificate_and_key,
2161            expired_at: None,
2162        };
2163        state
2164            .dispatch(&RequestType::AddCertificate(add_certificate).into())
2165            .expect("Could not add certificate");
2166
2167        println!("state: {:#?}", state);
2168
2169        // let fingerprint: Fingerprint = serde_json::from_str(
2170        //     "\"ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5\"",
2171        // )
2172        // .expect("Could not deserialize the fingerprint");
2173
2174        let certificates_found_by_fingerprint = state.get_certificates(QueryCertificatesFilters {
2175            domain: None,
2176            fingerprint: Some(
2177                "ab2618b674e15243fd02a5618c66509e4840ba60e7d64cebec84cdbfeceee0c5".to_string(),
2178            ),
2179        });
2180
2181        println!(
2182            "found certificate: {:#?}",
2183            certificates_found_by_fingerprint
2184        );
2185
2186        assert!(!certificates_found_by_fingerprint.is_empty());
2187
2188        let certificate_found_by_domain_name = state.get_certificates(QueryCertificatesFilters {
2189            domain: Some("lolcatho.st".to_string()),
2190            fingerprint: None,
2191        });
2192
2193        assert!(!certificate_found_by_domain_name.is_empty());
2194    }
2195}