github_workflows_update/
proxy.rs

1// Copyright (C) 2022 Leandro Lisboa Penz <lpenz@lpenz.org>
2// This file is subject to the terms and conditions defined in
3// file 'LICENSE', which is part of this source code package.
4
5//! The proxy [`Server`] spawns a task that makes async requests and
6//! caches the result, while async [`Client`] provides the API.
7
8use anyhow;
9use std::collections::HashMap;
10use tokio::sync::mpsc;
11use tokio::sync::oneshot;
12use tracing::{Level, event, instrument};
13
14use crate::resource::Resource;
15use crate::version::Version;
16
17#[derive(Debug)]
18pub struct Server {
19    server_ch: mpsc::Sender<Message>,
20}
21
22#[derive(Debug)]
23pub struct Client {
24    server_ch: mpsc::Sender<Message>,
25}
26
27#[derive(Debug)]
28pub enum Message {
29    Request {
30        resource: Resource,
31        client_ch: oneshot::Sender<Option<Vec<Version>>>,
32    },
33    Downloaded {
34        resource: Resource,
35        versions: Option<Vec<Version>>,
36    },
37}
38
39type Cache = HashMap<Resource, Option<Vec<Version>>>;
40type Pending = HashMap<Resource, Vec<oneshot::Sender<Option<Vec<Version>>>>>;
41
42impl Server {
43    #[instrument(level = "debug")]
44    pub fn new() -> Server {
45        let (server_ch, mut queue): (mpsc::Sender<Message>, mpsc::Receiver<Message>) =
46            mpsc::channel(32);
47        let worker_ch = server_ch.clone();
48        tokio::spawn(async move {
49            event!(Level::INFO, "Server task started");
50            let mut pending: Pending = Default::default();
51            let mut cache: Cache = Default::default();
52            while let Some(msg) = queue.recv().await {
53                match msg {
54                    Message::Request {
55                        resource,
56                        client_ch,
57                    } => {
58                        Server::handle_request(
59                            worker_ch.clone(),
60                            &cache,
61                            &mut pending,
62                            resource,
63                            client_ch,
64                        )
65                        .await
66                    }
67                    Message::Downloaded { resource, versions } => {
68                        cache.insert(resource.clone(), versions.clone());
69                        if let Some(clients) = pending.remove(&resource) {
70                            event!(
71                                Level::INFO,
72                                resource = %resource,
73                                num_clients = clients.len(),
74                                "retrieved, answering pending"
75                            );
76                            for client_ch in clients {
77                                client_ch.send(versions.clone()).unwrap();
78                            }
79                        } else {
80                            event!(
81                                Level::ERROR,
82                                resource = %resource,
83                                "no pending request found"
84                            );
85                        }
86                    }
87                }
88            }
89        });
90        Server { server_ch }
91    }
92
93    #[instrument(level = "debug")]
94    async fn handle_request(
95        worker_ch: mpsc::Sender<Message>,
96        cache: &Cache,
97        pending: &mut Pending,
98        resource: Resource,
99        client_ch: oneshot::Sender<Option<Vec<Version>>>,
100    ) {
101        if let Some(versions) = cache.get(&resource) {
102            event!(Level::INFO, resource = %resource, "cache hit");
103            Server::worker_send(worker_ch, resource, versions.clone()).await;
104            return;
105        }
106        let e = pending.entry(resource.clone()).or_default();
107        if e.is_empty() {
108            event!(Level::INFO, resource = %resource, "downloader task started");
109            tokio::spawn(async move {
110                match resource.get_versions().await {
111                    Ok(versions) => {
112                        Server::worker_send(worker_ch, resource, Some(versions)).await;
113                    }
114                    Err(e) => {
115                        event!(
116                            Level::ERROR,
117                            resource = %resource,
118                            error = %e,
119                            "error in get_version"
120                        );
121                        Server::worker_send(worker_ch, resource, None).await;
122                    }
123                };
124            });
125        } else {
126            event!(
127                Level::INFO,
128                resource = %resource,
129                "downloader task already present"
130            );
131        }
132        e.push(client_ch);
133    }
134
135    #[instrument(level = "debug")]
136    async fn worker_send(
137        worker_ch: mpsc::Sender<Message>,
138        resource: Resource,
139        versions: Option<Vec<Version>>,
140    ) {
141        if let Err(e) = worker_ch
142            .send(Message::Downloaded { resource, versions })
143            .await
144        {
145            event!(
146                Level::ERROR,
147                error = %e,
148                "error sending download to server task"
149            );
150        }
151    }
152
153    #[instrument(level = "debug")]
154    pub fn new_client(&self) -> Client {
155        Client {
156            server_ch: self.server_ch.clone(),
157        }
158    }
159}
160
161impl Default for Server {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167impl Client {
168    #[instrument(level = "debug")]
169    pub async fn get_versions(&self, resource: &Resource) -> anyhow::Result<Option<Vec<Version>>> {
170        let (client_ch, response) = oneshot::channel();
171        self.server_ch
172            .send(Message::Request {
173                resource: resource.clone(),
174                client_ch,
175            })
176            .await?;
177        Ok(response.await?)
178    }
179
180    #[instrument(level = "debug")]
181    pub async fn fetch_latest_version(
182        &self,
183        resource: &Resource,
184        current_version: &Version,
185    ) -> Option<(Resource, Version)> {
186        let versions = match self.get_versions(resource).await {
187            Ok(versions) => versions.unwrap_or_default(),
188            Err(e) => {
189                event!(
190                    Level::ERROR,
191                    resource = %resource,
192                    error = %e,
193                    "error getting version",
194                );
195                return None;
196            }
197        };
198        if versions.is_empty() {
199            event!(
200                Level::ERROR,
201                resource = %resource,
202                versions = ?versions,
203                "no version found",
204            );
205            return None;
206        } else if !versions.contains(current_version) {
207            event!(
208                Level::WARN,
209                resource = %resource,
210                current = %current_version,
211                versions = ?versions,
212                "current version not present in version list",
213            );
214        }
215        let latest = versions.iter().max().unwrap();
216        event!(
217            Level::INFO,
218            resource = %resource,
219            versions = ?versions,
220            latest = %latest,
221            "got versions",
222        );
223        Some((resource.clone(), latest.clone()))
224    }
225}