gatel_core/proxy/
srv_upstream.rs1use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::RwLock;
11use tracing::{debug, warn};
12
13pub type SrvBackends = Arc<RwLock<Vec<(String, u16)>>>;
16
17pub struct SrvResolver {
20 _task: tokio::task::JoinHandle<()>,
21 pub backends: SrvBackends,
23}
24
25impl SrvResolver {
26 pub fn start(service_name: String, refresh_interval: Duration) -> Self {
29 let backends: SrvBackends = Arc::new(RwLock::new(Vec::new()));
30 let store = Arc::clone(&backends);
31 let name = service_name.clone();
32
33 let task = tokio::spawn(async move {
34 let dns = hickory_resolver::TokioResolver::builder_tokio()
37 .unwrap_or_else(|_| {
38 hickory_resolver::TokioResolver::builder_with_config(
39 hickory_resolver::config::ResolverConfig::default(),
40 hickory_resolver::name_server::TokioConnectionProvider::default(),
41 )
42 })
43 .build();
44
45 resolve_and_update(&name, &dns, &store).await;
48
49 loop {
50 tokio::time::sleep(refresh_interval).await;
51 resolve_and_update(&name, &dns, &store).await;
52 }
53 });
54
55 Self {
56 _task: task,
57 backends,
58 }
59 }
60}
61
62impl Drop for SrvResolver {
63 fn drop(&mut self) {
64 self._task.abort();
65 }
66}
67
68async fn resolve_and_update(
70 service_name: &str,
71 dns: &hickory_resolver::TokioResolver,
72 store: &SrvBackends,
73) {
74 match dns.srv_lookup(service_name).await {
75 Ok(records) => {
76 let mut entries: Vec<(String, u16, u16, u16)> = records
78 .iter()
79 .map(|r| {
80 let host = r.target().to_string().trim_end_matches('.').to_string();
81 (host, r.port(), r.priority(), r.weight())
82 })
83 .collect();
84
85 if let Some(min_priority) = entries.iter().map(|e| e.2).min() {
87 entries.retain(|e| e.2 == min_priority);
88 }
89
90 let resolved: Vec<(String, u16)> =
91 entries.into_iter().map(|(h, p, ..)| (h, p)).collect();
92
93 if resolved.is_empty() {
94 warn!(
95 service = %service_name,
96 "SRV lookup returned zero records; keeping previous list"
97 );
98 return;
99 }
100
101 debug!(
102 service = %service_name,
103 count = resolved.len(),
104 "SRV lookup resolved"
105 );
106 *store.write().await = resolved;
107 }
108 Err(e) => {
109 warn!(
110 service = %service_name,
111 error = %e,
112 "SRV lookup failed; keeping previous list"
113 );
114 }
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn srv_backends_starts_empty() {
124 let backends: SrvBackends = Arc::new(RwLock::new(Vec::new()));
127 let cloned = Arc::clone(&backends);
129 assert!(Arc::ptr_eq(&backends, &cloned));
130 }
131}