ref_solver/refget/
enrichment.rs1use std::sync::Arc;
4
5use tokio::sync::Semaphore;
6
7use crate::core::contig::Contig;
8use crate::refget::{EnrichedContig, RefgetAlias, RefgetConfig, RefgetLookupResult};
9
10pub async fn enrich_contigs(contigs: &[Contig], config: &RefgetConfig) -> Vec<EnrichedContig> {
21 let semaphore = Arc::new(Semaphore::new(config.max_concurrent));
22
23 let http_client = match reqwest::Client::builder().timeout(config.timeout).build() {
25 Ok(c) => c,
26 Err(e) => {
27 tracing::warn!("Failed to create HTTP client for refget: {e}");
28 return error_for_all(contigs, &format!("Failed to create HTTP client: {e}"));
29 }
30 };
31
32 let refget_client =
33 match refget_client::RefgetClient::with_client(http_client, &config.server_url) {
34 Ok(c) => Arc::new(c),
35 Err(e) => {
36 tracing::warn!("Failed to create refget client: {e}");
37 return error_for_all(contigs, &format!("Failed to create refget client: {e}"));
38 }
39 };
40
41 let mut join_set = tokio::task::JoinSet::new();
42
43 for (idx, contig) in contigs.iter().enumerate() {
44 let md5 = match &contig.md5 {
45 Some(md5) => md5.clone(),
46 None => {
47 continue;
50 }
51 };
52
53 let sem = Arc::clone(&semaphore);
54 let client = Arc::clone(&refget_client);
55
56 join_set.spawn(async move {
57 let _permit = sem.acquire().await.expect("semaphore closed unexpectedly");
58 let result = lookup_single(&client, &md5).await;
59 (idx, result)
60 });
61 }
62
63 let mut results: Vec<Option<RefgetLookupResult>> = vec![None; contigs.len()];
65
66 while let Some(join_result) = join_set.join_next().await {
67 match join_result {
68 Ok((idx, lookup_result)) => {
69 results[idx] = Some(lookup_result);
70 }
71 Err(e) => {
72 tracing::warn!("Refget lookup task panicked: {e}");
73 }
74 }
75 }
76
77 contigs
78 .iter()
79 .enumerate()
80 .map(|(idx, contig)| {
81 let refget_metadata = results[idx].take().unwrap_or_else(|| {
82 if contig.md5.is_none() {
83 RefgetLookupResult::Error {
84 message: "No MD5 digest available for lookup".to_string(),
85 }
86 } else {
87 RefgetLookupResult::Error {
88 message: "Lookup task failed".to_string(),
89 }
90 }
91 });
92
93 EnrichedContig {
94 name: contig.name.clone(),
95 md5: contig.md5.clone(),
96 refget_metadata,
97 }
98 })
99 .collect()
100}
101
102fn error_for_all(contigs: &[Contig], message: &str) -> Vec<EnrichedContig> {
104 contigs
105 .iter()
106 .map(|c| EnrichedContig {
107 name: c.name.clone(),
108 md5: c.md5.clone(),
109 refget_metadata: RefgetLookupResult::Error {
110 message: message.to_string(),
111 },
112 })
113 .collect()
114}
115
116async fn lookup_single(client: &refget_client::RefgetClient, md5: &str) -> RefgetLookupResult {
118 match client.get_metadata(md5).await {
119 Ok(Some(metadata)) => RefgetLookupResult::Found {
120 aliases: metadata
121 .aliases
122 .into_iter()
123 .map(|a| RefgetAlias {
124 naming_authority: a.naming_authority,
125 value: a.value,
126 })
127 .collect(),
128 sha512t24u: metadata.sha512t24u,
129 circular: metadata.circular,
130 },
131 Ok(None) => RefgetLookupResult::NotFound,
132 Err(e) => {
133 tracing::debug!("Refget lookup failed for {md5}: {e}");
134 RefgetLookupResult::Error {
135 message: e.to_string(),
136 }
137 }
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use crate::core::contig::Contig;
145
146 #[tokio::test]
147 async fn test_contigs_without_md5_are_skipped() {
148 let config = RefgetConfig::new("http://192.0.2.1:1");
150 let contigs = vec![Contig::new("chr_no_md5".to_string(), 1000)];
151
152 let results = enrich_contigs(&contigs, &config).await;
153
154 assert_eq!(results.len(), 1);
155 assert_eq!(results[0].name, "chr_no_md5");
156 assert!(results[0].md5.is_none());
157 assert!(matches!(
159 results[0].refget_metadata,
160 RefgetLookupResult::Error { .. }
161 ));
162 }
163
164 #[tokio::test]
165 async fn test_invalid_server_produces_errors() {
166 let mut config = RefgetConfig::new("http://192.0.2.1:1");
168 config.timeout = std::time::Duration::from_millis(100);
169
170 let contigs = vec![
171 Contig::new("chr1".to_string(), 1000).with_md5("6aef897c3d6ff0c78aff06ac189178dd")
172 ];
173
174 let results = enrich_contigs(&contigs, &config).await;
175
176 assert_eq!(results.len(), 1);
177 assert_eq!(results[0].name, "chr1");
178 assert!(matches!(
180 results[0].refget_metadata,
181 RefgetLookupResult::Error { .. }
182 ));
183 }
184
185 #[tokio::test]
186 async fn test_mixed_contigs_with_and_without_md5() {
187 let mut config = RefgetConfig::new("http://192.0.2.1:1");
188 config.timeout = std::time::Duration::from_millis(100);
189
190 let contigs = vec![
191 Contig::new("chr1".to_string(), 1000).with_md5("6aef897c3d6ff0c78aff06ac189178dd"),
192 Contig::new("chrUn".to_string(), 1000),
193 Contig::new("chr2".to_string(), 1000).with_md5("b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2"),
194 ];
195
196 let results = enrich_contigs(&contigs, &config).await;
197
198 assert_eq!(results.len(), 3);
199 assert_eq!(results[0].name, "chr1");
200 assert_eq!(results[1].name, "chrUn");
201 assert_eq!(results[2].name, "chr2");
202
203 assert!(results[1].md5.is_none());
205 assert!(matches!(
206 results[1].refget_metadata,
207 RefgetLookupResult::Error { .. }
208 ));
209 }
210
211 #[tokio::test]
212 async fn test_concurrency_limit_respected() {
213 let mut config = RefgetConfig::new("http://192.0.2.1:1");
214 config.timeout = std::time::Duration::from_millis(100);
215 config.max_concurrent = 2; let contigs: Vec<Contig> = (0..5)
219 .map(|i| Contig::new(format!("chr{i}"), 1000).with_md5(format!("{i:032x}")))
220 .collect();
221
222 let results = enrich_contigs(&contigs, &config).await;
223
224 assert_eq!(results.len(), 5);
226 for result in &results {
227 assert!(matches!(
228 result.refget_metadata,
229 RefgetLookupResult::Error { .. }
230 ));
231 }
232 }
233}