detsys_srv/client/
policy.rs

1use crate::{resolver::SrvResolver, Error, SrvClient, SrvRecord};
2use arc_swap::ArcSwapOption;
3use async_trait::async_trait;
4use std::sync::Arc;
5use url::Url;
6
7pub use super::Cache;
8
9/// Policy for [`SrvClient`] to use when selecting SRV targets to recommend.
10#[async_trait]
11pub trait Policy: Sized {
12    /// Type of item stored in a client's cache.
13    type CacheItem;
14
15    /// Iterator of indices used to order cache items.
16    type Ordering: Iterator<Item = usize>;
17
18    /// Obtains a refreshed cache for a client.
19    async fn refresh_cache<Resolver: SrvResolver>(
20        &self,
21        client: &SrvClient<Resolver, Self>,
22    ) -> Result<Cache<Self::CacheItem>, Error<Resolver::Error>>;
23
24    /// Creates an iterator of indices corresponding to cache items in the
25    /// order a [`SrvClient`] should try using them to perform an operation.
26    fn order(&self, items: &[Self::CacheItem]) -> Self::Ordering;
27
28    /// Converts a reference to a cached item into a reference to a [`Url`].
29    fn cache_item_to_uri(item: &Self::CacheItem) -> &Url;
30
31    /// Makes any policy adjustments following a successful execution on `url`.
32    #[allow(unused_variables)]
33    fn note_success(&self, url: &Url) {}
34
35    /// Makes any policy adjustments following a failed execution on `uri`.
36    #[allow(unused_variables)]
37    fn note_failure(&self, url: &Url) {}
38}
39
40/// Policy that selects targets based on past successes--if a target was used
41/// successfully in a past execution, it will be recommended first.
42#[derive(Default)]
43pub struct Affinity {
44    last_working_target: ArcSwapOption<Url>,
45}
46
47#[async_trait]
48impl Policy for Affinity {
49    type CacheItem = Url;
50    type Ordering = AffinityUrlIter;
51
52    async fn refresh_cache<Resolver: SrvResolver>(
53        &self,
54        client: &SrvClient<Resolver, Self>,
55    ) -> Result<Cache<Self::CacheItem>, Error<Resolver::Error>> {
56        let (uris, valid_until) = client.get_fresh_uri_candidates().await?;
57        Ok(Cache::new(uris, valid_until))
58    }
59
60    fn order(&self, uris: &[Url]) -> Self::Ordering {
61        let preferred = self.last_working_target.load();
62        Affinity::uris_preferring(uris, preferred.as_deref())
63    }
64
65    fn cache_item_to_uri(item: &Self::CacheItem) -> &Url {
66        item
67    }
68
69    fn note_success(&self, uri: &Url) {
70        self.last_working_target.store(Some(Arc::new(uri.clone())));
71    }
72}
73
74impl Affinity {
75    fn uris_preferring(uris: &[Url], preferred: Option<&Url>) -> AffinityUrlIter {
76        let preferred = preferred
77            .and_then(|preferred| uris.as_ref().iter().position(|uri| uri == preferred))
78            .unwrap_or(0);
79        AffinityUrlIter {
80            n: uris.len(),
81            preferred,
82            next: None,
83        }
84    }
85}
86
87/// Iterator over [`Url`]s based on affinity. See [`Affinity`].
88pub struct AffinityUrlIter {
89    /// Number of uris in the cache.e
90    n: usize,
91    /// Index of the URI to produce first (i.e. the preferred URL).
92    /// `0` if the first is preferred or there is no preferred URL at all.
93    preferred: usize,
94    /// Index of the next URI to be produced.
95    /// If `None`, the preferred URI will be produced.
96    next: Option<usize>,
97}
98
99impl Iterator for AffinityUrlIter {
100    type Item = usize;
101
102    fn next(&mut self) -> Option<Self::Item> {
103        let (idx, next) = match self.next {
104            // If no URIs have been produced, produce the preferred URI then go back to the first
105            None => (self.preferred, 0),
106            // If `preferred` is next, skip past it since it was produced already (`self.next != None`)
107            Some(next) if next == self.preferred => (next + 1, next + 2),
108            // Otherwise, advance normally
109            Some(next) => (next, next + 1),
110        };
111        self.next = Some(next);
112        if idx < self.n {
113            Some(idx)
114        } else {
115            None
116        }
117    }
118}
119
120/// Policy that selects targets based on the algorithm in RFC 2782, reshuffling
121/// by weight for each selection.
122#[derive(Default)]
123pub struct Rfc2782;
124
125/// Representation of a SRV record with its target and port parsed into a [`Url`].
126pub struct ParsedRecord {
127    uri: Url,
128    priority: u16,
129    weight: u16,
130}
131
132impl ParsedRecord {
133    fn new<Record: SrvRecord>(record: &Record, uri: Url) -> Self {
134        Self {
135            uri,
136            priority: record.priority(),
137            weight: record.weight(),
138        }
139    }
140}
141
142#[async_trait]
143impl Policy for Rfc2782 {
144    type CacheItem = ParsedRecord;
145    type Ordering = <Vec<usize> as IntoIterator>::IntoIter;
146
147    async fn refresh_cache<Resolver: SrvResolver>(
148        &self,
149        client: &SrvClient<Resolver, Self>,
150    ) -> Result<Cache<Self::CacheItem>, Error<Resolver::Error>> {
151        let (records, valid_until) = client.get_srv_records().await?;
152        let parsed = records
153            .iter()
154            .map(|record| {
155                client
156                    .parse_record(record)
157                    .map(|uri| ParsedRecord::new(record, uri))
158            })
159            .collect::<Result<Vec<_>, _>>()?;
160        Ok(Cache::new(parsed, valid_until))
161    }
162
163    fn order(&self, records: &[ParsedRecord]) -> Self::Ordering {
164        let mut indices = (0..records.len()).collect::<Vec<_>>();
165        let mut rng = rand::rng();
166        indices.sort_by_cached_key(|&idx| {
167            let (priority, weight) = (records[idx].priority, records[idx].weight);
168            crate::record::sort_key(priority, weight, &mut rng)
169        });
170        indices.into_iter()
171    }
172
173    fn cache_item_to_uri(item: &Self::CacheItem) -> &Url {
174        &item.uri
175    }
176}
177
178#[test]
179fn affinity_uris_iter_order() {
180    let google: Url = "https://google.com".parse().unwrap();
181    let amazon: Url = "https://amazon.com".parse().unwrap();
182    let desco: Url = "https://deshaw.com".parse().unwrap();
183    let cache = vec![google.clone(), amazon.clone(), desco.clone()];
184    let order = |preferred| {
185        Affinity::uris_preferring(&cache, preferred)
186            .map(|idx| &cache[idx])
187            .collect::<Vec<_>>()
188    };
189    assert_eq!(order(None), vec![&google, &amazon, &desco]);
190    assert_eq!(order(Some(&google)), vec![&google, &amazon, &desco]);
191    assert_eq!(order(Some(&amazon)), vec![&amazon, &google, &desco]);
192    assert_eq!(order(Some(&desco)), vec![&desco, &google, &amazon]);
193}
194
195#[test]
196fn balance_uris_iter_order() {
197    // Clippy doesn't like that Url has interior mutability and is being used
198    // as a HashMap key but we aren't doing anything naughty in the test
199    #[allow(clippy::mutable_key_type)]
200    let mut priorities = std::collections::HashMap::new();
201    priorities.insert("https://google.com".parse::<Url>().unwrap(), 2);
202    priorities.insert("https://cloudflare.com".parse().unwrap(), 2);
203    priorities.insert("https://amazon.com".parse().unwrap(), 1);
204    priorities.insert("https://deshaw.com".parse().unwrap(), 1);
205
206    let cache = priorities
207        .iter()
208        .map(|(uri, &priority)| ParsedRecord {
209            uri: uri.clone(),
210            priority,
211            weight: rand::random::<u8>() as u16,
212        })
213        .collect::<Vec<_>>();
214
215    let ordered = |iter: <Rfc2782 as Policy>::Ordering| {
216        let mut last = None;
217        for item in iter.map(|idx| &cache[idx]) {
218            if let Some(last) = last {
219                assert!(priorities[last] <= priorities[&item.uri]);
220            }
221            last = Some(&item.uri);
222        }
223    };
224
225    for _ in 0..5 {
226        ordered(Rfc2782.order(&cache));
227    }
228}