detsys_srv/client/
policy.rs1use crate::{resolver::SrvResolver, Error, SrvClient, SrvRecord};
2use arc_swap::ArcSwapOption;
3use async_trait::async_trait;
4use std::sync::Arc;
5use url::Url;
6
7pub use super::Cache;
8
9#[async_trait]
11pub trait Policy: Sized {
12 type CacheItem;
14
15 type Ordering: Iterator<Item = usize>;
17
18 async fn refresh_cache<Resolver: SrvResolver>(
20 &self,
21 client: &SrvClient<Resolver, Self>,
22 ) -> Result<Cache<Self::CacheItem>, Error<Resolver::Error>>;
23
24 fn order(&self, items: &[Self::CacheItem]) -> Self::Ordering;
27
28 fn cache_item_to_uri(item: &Self::CacheItem) -> &Url;
30
31 #[allow(unused_variables)]
33 fn note_success(&self, url: &Url) {}
34
35 #[allow(unused_variables)]
37 fn note_failure(&self, url: &Url) {}
38}
39
40#[derive(Default)]
43pub struct Affinity {
44 last_working_target: ArcSwapOption<Url>,
45}
46
47#[async_trait]
48impl Policy for Affinity {
49 type CacheItem = Url;
50 type Ordering = AffinityUrlIter;
51
52 async fn refresh_cache<Resolver: SrvResolver>(
53 &self,
54 client: &SrvClient<Resolver, Self>,
55 ) -> Result<Cache<Self::CacheItem>, Error<Resolver::Error>> {
56 let (uris, valid_until) = client.get_fresh_uri_candidates().await?;
57 Ok(Cache::new(uris, valid_until))
58 }
59
60 fn order(&self, uris: &[Url]) -> Self::Ordering {
61 let preferred = self.last_working_target.load();
62 Affinity::uris_preferring(uris, preferred.as_deref())
63 }
64
65 fn cache_item_to_uri(item: &Self::CacheItem) -> &Url {
66 item
67 }
68
69 fn note_success(&self, uri: &Url) {
70 self.last_working_target.store(Some(Arc::new(uri.clone())));
71 }
72}
73
74impl Affinity {
75 fn uris_preferring(uris: &[Url], preferred: Option<&Url>) -> AffinityUrlIter {
76 let preferred = preferred
77 .and_then(|preferred| uris.as_ref().iter().position(|uri| uri == preferred))
78 .unwrap_or(0);
79 AffinityUrlIter {
80 n: uris.len(),
81 preferred,
82 next: None,
83 }
84 }
85}
86
87pub struct AffinityUrlIter {
89 n: usize,
91 preferred: usize,
94 next: Option<usize>,
97}
98
99impl Iterator for AffinityUrlIter {
100 type Item = usize;
101
102 fn next(&mut self) -> Option<Self::Item> {
103 let (idx, next) = match self.next {
104 None => (self.preferred, 0),
106 Some(next) if next == self.preferred => (next + 1, next + 2),
108 Some(next) => (next, next + 1),
110 };
111 self.next = Some(next);
112 if idx < self.n {
113 Some(idx)
114 } else {
115 None
116 }
117 }
118}
119
120#[derive(Default)]
123pub struct Rfc2782;
124
125pub struct ParsedRecord {
127 uri: Url,
128 priority: u16,
129 weight: u16,
130}
131
132impl ParsedRecord {
133 fn new<Record: SrvRecord>(record: &Record, uri: Url) -> Self {
134 Self {
135 uri,
136 priority: record.priority(),
137 weight: record.weight(),
138 }
139 }
140}
141
142#[async_trait]
143impl Policy for Rfc2782 {
144 type CacheItem = ParsedRecord;
145 type Ordering = <Vec<usize> as IntoIterator>::IntoIter;
146
147 async fn refresh_cache<Resolver: SrvResolver>(
148 &self,
149 client: &SrvClient<Resolver, Self>,
150 ) -> Result<Cache<Self::CacheItem>, Error<Resolver::Error>> {
151 let (records, valid_until) = client.get_srv_records().await?;
152 let parsed = records
153 .iter()
154 .map(|record| {
155 client
156 .parse_record(record)
157 .map(|uri| ParsedRecord::new(record, uri))
158 })
159 .collect::<Result<Vec<_>, _>>()?;
160 Ok(Cache::new(parsed, valid_until))
161 }
162
163 fn order(&self, records: &[ParsedRecord]) -> Self::Ordering {
164 let mut indices = (0..records.len()).collect::<Vec<_>>();
165 let mut rng = rand::rng();
166 indices.sort_by_cached_key(|&idx| {
167 let (priority, weight) = (records[idx].priority, records[idx].weight);
168 crate::record::sort_key(priority, weight, &mut rng)
169 });
170 indices.into_iter()
171 }
172
173 fn cache_item_to_uri(item: &Self::CacheItem) -> &Url {
174 &item.uri
175 }
176}
177
178#[test]
179fn affinity_uris_iter_order() {
180 let google: Url = "https://google.com".parse().unwrap();
181 let amazon: Url = "https://amazon.com".parse().unwrap();
182 let desco: Url = "https://deshaw.com".parse().unwrap();
183 let cache = vec![google.clone(), amazon.clone(), desco.clone()];
184 let order = |preferred| {
185 Affinity::uris_preferring(&cache, preferred)
186 .map(|idx| &cache[idx])
187 .collect::<Vec<_>>()
188 };
189 assert_eq!(order(None), vec![&google, &amazon, &desco]);
190 assert_eq!(order(Some(&google)), vec![&google, &amazon, &desco]);
191 assert_eq!(order(Some(&amazon)), vec![&amazon, &google, &desco]);
192 assert_eq!(order(Some(&desco)), vec![&desco, &google, &amazon]);
193}
194
195#[test]
196fn balance_uris_iter_order() {
197 #[allow(clippy::mutable_key_type)]
200 let mut priorities = std::collections::HashMap::new();
201 priorities.insert("https://google.com".parse::<Url>().unwrap(), 2);
202 priorities.insert("https://cloudflare.com".parse().unwrap(), 2);
203 priorities.insert("https://amazon.com".parse().unwrap(), 1);
204 priorities.insert("https://deshaw.com".parse().unwrap(), 1);
205
206 let cache = priorities
207 .iter()
208 .map(|(uri, &priority)| ParsedRecord {
209 uri: uri.clone(),
210 priority,
211 weight: rand::random::<u8>() as u16,
212 })
213 .collect::<Vec<_>>();
214
215 let ordered = |iter: <Rfc2782 as Policy>::Ordering| {
216 let mut last = None;
217 for item in iter.map(|idx| &cache[idx]) {
218 if let Some(last) = last {
219 assert!(priorities[last] <= priorities[&item.uri]);
220 }
221 last = Some(&item.uri);
222 }
223 };
224
225 for _ in 0..5 {
226 ordered(Rfc2782.order(&cache));
227 }
228}