github_workflow_update/
resolver.rs

1// Copyright (C) 2022 Leandro Lisboa Penz <lpenz@lpenz.org>
2// This file is subject to the terms and conditions defined in
3// file 'LICENSE', which is part of this source code package.
4
5//! A resolver [`Server`] that makes async requests and caches the
6//! result, and an async [`Client`] that fills the `entity.latest`
7//! field with the latest version of the upstream entity.
8
9use anyhow;
10use std::collections::HashMap;
11use tokio::sync::mpsc;
12use tokio::sync::oneshot;
13use tracing::{event, instrument, Level};
14
15use crate::entity::Entity;
16use crate::updater::updater_for;
17use crate::updater::Updater;
18use crate::version::Version;
19
20#[derive(Debug)]
21pub struct Server {
22    server_ch: mpsc::Sender<Message>,
23}
24
25#[derive(Debug)]
26pub struct Client {
27    server_ch: mpsc::Sender<Message>,
28}
29
30#[derive(Debug)]
31pub enum Message {
32    Request {
33        resource: String,
34        client_ch: oneshot::Sender<Option<Vec<Version>>>,
35    },
36    Downloaded {
37        resource: String,
38        versions: Option<Vec<Version>>,
39    },
40}
41
42type Cache = HashMap<String, Option<Vec<Version>>>;
43type Pending = HashMap<String, Vec<oneshot::Sender<Option<Vec<Version>>>>>;
44
45impl Server {
46    #[instrument(level = "debug")]
47    pub fn new() -> Server {
48        let (server_ch, mut queue): (mpsc::Sender<Message>, mpsc::Receiver<Message>) =
49            mpsc::channel(32);
50        let worker_ch = server_ch.clone();
51        tokio::spawn(async move {
52            event!(Level::INFO, "Server task started");
53            let mut pending: Pending = Default::default();
54            let mut cache: Cache = Default::default();
55            while let Some(msg) = queue.recv().await {
56                match msg {
57                    Message::Request {
58                        resource,
59                        client_ch,
60                    } => {
61                        Server::handle_request(
62                            worker_ch.clone(),
63                            &cache,
64                            &mut pending,
65                            resource,
66                            client_ch,
67                        )
68                        .await
69                    }
70                    Message::Downloaded { resource, versions } => {
71                        cache.insert(resource.clone(), versions.clone());
72                        if let Some(clients) = pending.remove(&resource) {
73                            event!(
74                                Level::INFO,
75                                resource = resource,
76                                num_clients = clients.len(),
77                                "retrieved, answering pending"
78                            );
79                            for client_ch in clients {
80                                client_ch.send(versions.clone()).unwrap();
81                            }
82                        } else {
83                            event!(
84                                Level::ERROR,
85                                resource = resource,
86                                "no pending request found"
87                            );
88                        }
89                    }
90                }
91            }
92        });
93        Server { server_ch }
94    }
95
96    #[instrument(level = "debug")]
97    async fn handle_request(
98        worker_ch: mpsc::Sender<Message>,
99        cache: &Cache,
100        pending: &mut Pending,
101        resource: String,
102        client_ch: oneshot::Sender<Option<Vec<Version>>>,
103    ) {
104        if let Some(versions) = cache.get(&resource) {
105            event!(Level::INFO, resource = resource, "cache hit");
106            Server::worker_send(worker_ch, resource, versions.clone()).await;
107            return;
108        }
109        let updater = match updater_for(&resource) {
110            Ok(updater) => updater,
111            Err(e) => {
112                event!(
113                    Level::ERROR,
114                    resource = resource,
115                    error = %e,
116                    "error getting updater",
117                );
118                return;
119            }
120        };
121        let url = match updater.url(&resource) {
122            Some(url) => url,
123            None => {
124                event!(
125                    Level::ERROR,
126                    resource = resource,
127                    updater = ?updater,
128                    "updater could not parse url",
129                );
130                return;
131            }
132        };
133        let e = pending.entry(resource.clone()).or_default();
134        if e.is_empty() {
135            event!(Level::INFO, resource = resource, "downloader task started");
136            tokio::spawn(async move {
137                match updater.get_versions(&url).await {
138                    Ok(versions) => {
139                        Server::worker_send(worker_ch, resource, Some(versions)).await;
140                    }
141                    Err(e) => {
142                        event!(
143                            Level::ERROR,
144                            resource = resource,
145                            updater = ?updater,
146                            error = %e,
147                            "error in get_version"
148                        );
149                        Server::worker_send(worker_ch, resource, None).await;
150                    }
151                };
152            });
153        } else {
154            event!(
155                Level::INFO,
156                resource = resource,
157                "downloader task already present"
158            );
159        }
160        e.push(client_ch);
161    }
162
163    #[instrument(level = "debug")]
164    async fn worker_send(
165        worker_ch: mpsc::Sender<Message>,
166        resource: String,
167        versions: Option<Vec<Version>>,
168    ) {
169        if let Err(e) = worker_ch
170            .send(Message::Downloaded { resource, versions })
171            .await
172        {
173            event!(
174                Level::ERROR,
175                error = %e,
176                "error sending download to server task"
177            );
178        }
179    }
180
181    #[instrument(level = "debug")]
182    pub fn new_client(&self) -> Client {
183        Client {
184            server_ch: self.server_ch.clone(),
185        }
186    }
187}
188
189impl Default for Server {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195impl Client {
196    #[instrument(level = "debug")]
197    pub async fn get_versions(&self, resource: &str) -> anyhow::Result<Option<Vec<Version>>> {
198        let (client_ch, response) = oneshot::channel();
199        self.server_ch
200            .send(Message::Request {
201                resource: resource.to_owned(),
202                client_ch,
203            })
204            .await?;
205        Ok(response.await?)
206    }
207
208    #[instrument(level = "debug")]
209    pub async fn resolve_entity(&self, mut entity: Entity) -> Entity {
210        let versions = match self.get_versions(&entity.resource).await {
211            Ok(versions) => versions.unwrap_or_default(),
212            Err(e) => {
213                event!(
214                    Level::ERROR,
215                    resource = entity.resource,
216                    error = %e,
217                    "error getting version",
218                );
219                return entity;
220            }
221        };
222        if versions.is_empty() {
223            event!(
224                Level::ERROR,
225                resource = entity.resource,
226                versions = ?versions,
227                "no version found",
228            );
229            return entity;
230        } else if !versions.contains(&entity.version) {
231            event!(
232                Level::WARN,
233                resource = entity.resource,
234                current = %entity.version,
235                versions = ?versions,
236                "current version not present in version list",
237            );
238        }
239        let latest = versions.iter().max().unwrap();
240        event!(
241            Level::INFO,
242            resource = entity.resource,
243            versions = ?versions,
244            latest = %latest,
245            "got versions",
246        );
247        entity.latest = Some(latest.clone());
248        if let Ok(updater) = updater_for(&entity.resource) {
249            entity.updated_line = updater.updated_line(&entity);
250        }
251        entity
252    }
253}