Skip to main content

ref_solver/refget/
enrichment.rs

1//! Async enrichment logic for querying refget servers about unknown contigs.
2
3use std::sync::Arc;
4
5use tokio::sync::Semaphore;
6
7use crate::core::contig::Contig;
8use crate::refget::{EnrichedContig, RefgetAlias, RefgetConfig, RefgetLookupResult};
9
10/// Look up metadata for a set of contigs from a refget server.
11///
12/// Only contigs that have an MD5 digest are queried. Lookups run concurrently
13/// up to `config.max_concurrent` at a time. Errors are captured per-contig
14/// and never propagate — the caller always gets results back.
15///
16/// # Panics
17///
18/// Panics if the internal semaphore is closed, which should not happen
19/// during normal operation.
20pub async fn enrich_contigs(contigs: &[Contig], config: &RefgetConfig) -> Vec<EnrichedContig> {
21    let semaphore = Arc::new(Semaphore::new(config.max_concurrent));
22
23    // Build a reqwest client with the configured timeout
24    let http_client = match reqwest::Client::builder().timeout(config.timeout).build() {
25        Ok(c) => c,
26        Err(e) => {
27            tracing::warn!("Failed to create HTTP client for refget: {e}");
28            return error_for_all(contigs, &format!("Failed to create HTTP client: {e}"));
29        }
30    };
31
32    let refget_client =
33        match refget_client::RefgetClient::with_client(http_client, &config.server_url) {
34            Ok(c) => Arc::new(c),
35            Err(e) => {
36                tracing::warn!("Failed to create refget client: {e}");
37                return error_for_all(contigs, &format!("Failed to create refget client: {e}"));
38            }
39        };
40
41    let mut join_set = tokio::task::JoinSet::new();
42
43    for (idx, contig) in contigs.iter().enumerate() {
44        let md5 = match &contig.md5 {
45            Some(md5) => md5.clone(),
46            None => {
47                // No MD5 — we can't query refget, skip this contig.
48                // We'll handle these in the result collection below.
49                continue;
50            }
51        };
52
53        let sem = Arc::clone(&semaphore);
54        let client = Arc::clone(&refget_client);
55
56        join_set.spawn(async move {
57            let _permit = sem.acquire().await.expect("semaphore closed unexpectedly");
58            let result = lookup_single(&client, &md5).await;
59            (idx, result)
60        });
61    }
62
63    // Collect results, preserving original order
64    let mut results: Vec<Option<RefgetLookupResult>> = vec![None; contigs.len()];
65
66    while let Some(join_result) = join_set.join_next().await {
67        match join_result {
68            Ok((idx, lookup_result)) => {
69                results[idx] = Some(lookup_result);
70            }
71            Err(e) => {
72                tracing::warn!("Refget lookup task panicked: {e}");
73            }
74        }
75    }
76
77    contigs
78        .iter()
79        .enumerate()
80        .map(|(idx, contig)| {
81            let refget_metadata = results[idx].take().unwrap_or_else(|| {
82                if contig.md5.is_none() {
83                    RefgetLookupResult::Error {
84                        message: "No MD5 digest available for lookup".to_string(),
85                    }
86                } else {
87                    RefgetLookupResult::Error {
88                        message: "Lookup task failed".to_string(),
89                    }
90                }
91            });
92
93            EnrichedContig {
94                name: contig.name.clone(),
95                md5: contig.md5.clone(),
96                refget_metadata,
97            }
98        })
99        .collect()
100}
101
102/// Create error results for all contigs when the client cannot be initialized.
103fn error_for_all(contigs: &[Contig], message: &str) -> Vec<EnrichedContig> {
104    contigs
105        .iter()
106        .map(|c| EnrichedContig {
107            name: c.name.clone(),
108            md5: c.md5.clone(),
109            refget_metadata: RefgetLookupResult::Error {
110                message: message.to_string(),
111            },
112        })
113        .collect()
114}
115
116/// Query the refget server for a single MD5 digest.
117async fn lookup_single(client: &refget_client::RefgetClient, md5: &str) -> RefgetLookupResult {
118    match client.get_metadata(md5).await {
119        Ok(Some(metadata)) => RefgetLookupResult::Found {
120            aliases: metadata
121                .aliases
122                .into_iter()
123                .map(|a| RefgetAlias {
124                    naming_authority: a.naming_authority,
125                    value: a.value,
126                })
127                .collect(),
128            sha512t24u: metadata.sha512t24u,
129            circular: metadata.circular,
130        },
131        Ok(None) => RefgetLookupResult::NotFound,
132        Err(e) => {
133            tracing::debug!("Refget lookup failed for {md5}: {e}");
134            RefgetLookupResult::Error {
135                message: e.to_string(),
136            }
137        }
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use crate::core::contig::Contig;
145
146    #[tokio::test]
147    async fn test_contigs_without_md5_are_skipped() {
148        // Use a non-routable address so no real HTTP happens
149        let config = RefgetConfig::new("http://192.0.2.1:1");
150        let contigs = vec![Contig::new("chr_no_md5".to_string(), 1000)];
151
152        let results = enrich_contigs(&contigs, &config).await;
153
154        assert_eq!(results.len(), 1);
155        assert_eq!(results[0].name, "chr_no_md5");
156        assert!(results[0].md5.is_none());
157        // Should be an error since no MD5 was available
158        assert!(matches!(
159            results[0].refget_metadata,
160            RefgetLookupResult::Error { .. }
161        ));
162    }
163
164    #[tokio::test]
165    async fn test_invalid_server_produces_errors() {
166        // Use a non-routable address with very short timeout
167        let mut config = RefgetConfig::new("http://192.0.2.1:1");
168        config.timeout = std::time::Duration::from_millis(100);
169
170        let contigs = vec![
171            Contig::new("chr1".to_string(), 1000).with_md5("6aef897c3d6ff0c78aff06ac189178dd")
172        ];
173
174        let results = enrich_contigs(&contigs, &config).await;
175
176        assert_eq!(results.len(), 1);
177        assert_eq!(results[0].name, "chr1");
178        // Should be an error due to connection failure/timeout
179        assert!(matches!(
180            results[0].refget_metadata,
181            RefgetLookupResult::Error { .. }
182        ));
183    }
184
185    #[tokio::test]
186    async fn test_mixed_contigs_with_and_without_md5() {
187        let mut config = RefgetConfig::new("http://192.0.2.1:1");
188        config.timeout = std::time::Duration::from_millis(100);
189
190        let contigs = vec![
191            Contig::new("chr1".to_string(), 1000).with_md5("6aef897c3d6ff0c78aff06ac189178dd"),
192            Contig::new("chrUn".to_string(), 1000),
193            Contig::new("chr2".to_string(), 1000).with_md5("b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2"),
194        ];
195
196        let results = enrich_contigs(&contigs, &config).await;
197
198        assert_eq!(results.len(), 3);
199        assert_eq!(results[0].name, "chr1");
200        assert_eq!(results[1].name, "chrUn");
201        assert_eq!(results[2].name, "chr2");
202
203        // chrUn has no MD5 — should get error
204        assert!(results[1].md5.is_none());
205        assert!(matches!(
206            results[1].refget_metadata,
207            RefgetLookupResult::Error { .. }
208        ));
209    }
210
211    #[tokio::test]
212    async fn test_concurrency_limit_respected() {
213        let mut config = RefgetConfig::new("http://192.0.2.1:1");
214        config.timeout = std::time::Duration::from_millis(100);
215        config.max_concurrent = 2; // Limit to 2
216
217        // Create more contigs than the concurrency limit
218        let contigs: Vec<Contig> = (0..5)
219            .map(|i| Contig::new(format!("chr{i}"), 1000).with_md5(format!("{i:032x}")))
220            .collect();
221
222        let results = enrich_contigs(&contigs, &config).await;
223
224        // All should complete (with errors due to unreachable server)
225        assert_eq!(results.len(), 5);
226        for result in &results {
227            assert!(matches!(
228                result.refget_metadata,
229                RefgetLookupResult::Error { .. }
230            ));
231        }
232    }
233}