pingora_load_balancing/
discovery.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Service discovery interface and implementations
16
17use arc_swap::ArcSwap;
18use async_trait::async_trait;
19use http::Extensions;
20use pingora_core::protocols::l4::socket::SocketAddr;
21use pingora_error::Result;
22use std::io::Result as IoResult;
23use std::net::ToSocketAddrs;
24use std::{
25    collections::{BTreeSet, HashMap},
26    sync::Arc,
27};
28
29use crate::Backend;
30
31/// [ServiceDiscovery] is the interface to discover [Backend]s.
32#[async_trait]
33pub trait ServiceDiscovery {
34    /// Return the discovered collection of backends.
35    /// And *optionally* whether these backends are enabled to serve or not in a `HashMap`. Any backend
36    /// that is not explicitly in the set is considered enabled.
37    async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)>;
38}
39
40// TODO: add DNS base discovery
41
42/// A static collection of [Backend]s for service discovery.
43#[derive(Default)]
44pub struct Static {
45    backends: ArcSwap<BTreeSet<Backend>>,
46}
47
48impl Static {
49    /// Create a new boxed [Static] service discovery with the given backends.
50    pub fn new(backends: BTreeSet<Backend>) -> Box<Self> {
51        Box::new(Static {
52            backends: ArcSwap::new(Arc::new(backends)),
53        })
54    }
55
56    /// Create a new boxed [Static] from a given iterator of items that implements [ToSocketAddrs].
57    pub fn try_from_iter<A, T: IntoIterator<Item = A>>(iter: T) -> IoResult<Box<Self>>
58    where
59        A: ToSocketAddrs,
60    {
61        let mut upstreams = BTreeSet::new();
62        for addrs in iter.into_iter() {
63            let addrs = addrs.to_socket_addrs()?.map(|addr| Backend {
64                addr: SocketAddr::Inet(addr),
65                weight: 1,
66                ext: Extensions::new(),
67            });
68            upstreams.extend(addrs);
69        }
70        Ok(Self::new(upstreams))
71    }
72
73    /// return the collection to backends
74    pub fn get(&self) -> BTreeSet<Backend> {
75        BTreeSet::clone(&self.backends.load())
76    }
77
78    // Concurrent set/add/remove might race with each other
79    // TODO: use a queue to avoid racing
80
81    // TODO: take an impl iter
82    #[allow(dead_code)]
83    pub(crate) fn set(&self, backends: BTreeSet<Backend>) {
84        self.backends.store(backends.into())
85    }
86
87    #[allow(dead_code)]
88    pub(crate) fn add(&self, backend: Backend) {
89        let mut new = self.get();
90        new.insert(backend);
91        self.set(new)
92    }
93
94    #[allow(dead_code)]
95    pub(crate) fn remove(&self, backend: &Backend) {
96        let mut new = self.get();
97        new.remove(backend);
98        self.set(new)
99    }
100}
101
102#[async_trait]
103impl ServiceDiscovery for Static {
104    async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
105        // no readiness
106        let health = HashMap::new();
107        Ok((self.get(), health))
108    }
109}