github_workflows_update/
proxy.rs1use anyhow;
9use std::collections::HashMap;
10use tokio::sync::mpsc;
11use tokio::sync::oneshot;
12use tracing::{Level, event, instrument};
13
14use crate::resource::Resource;
15use crate::version::Version;
16
17#[derive(Debug)]
18pub struct Server {
19 server_ch: mpsc::Sender<Message>,
20}
21
22#[derive(Debug)]
23pub struct Client {
24 server_ch: mpsc::Sender<Message>,
25}
26
27#[derive(Debug)]
28pub enum Message {
29 Request {
30 resource: Resource,
31 client_ch: oneshot::Sender<Option<Vec<Version>>>,
32 },
33 Downloaded {
34 resource: Resource,
35 versions: Option<Vec<Version>>,
36 },
37}
38
39type Cache = HashMap<Resource, Option<Vec<Version>>>;
40type Pending = HashMap<Resource, Vec<oneshot::Sender<Option<Vec<Version>>>>>;
41
42impl Server {
43 #[instrument(level = "debug")]
44 pub fn new() -> Server {
45 let (server_ch, mut queue): (mpsc::Sender<Message>, mpsc::Receiver<Message>) =
46 mpsc::channel(32);
47 let worker_ch = server_ch.clone();
48 tokio::spawn(async move {
49 event!(Level::INFO, "Server task started");
50 let mut pending: Pending = Default::default();
51 let mut cache: Cache = Default::default();
52 while let Some(msg) = queue.recv().await {
53 match msg {
54 Message::Request {
55 resource,
56 client_ch,
57 } => {
58 Server::handle_request(
59 worker_ch.clone(),
60 &cache,
61 &mut pending,
62 resource,
63 client_ch,
64 )
65 .await
66 }
67 Message::Downloaded { resource, versions } => {
68 cache.insert(resource.clone(), versions.clone());
69 if let Some(clients) = pending.remove(&resource) {
70 event!(
71 Level::INFO,
72 resource = %resource,
73 num_clients = clients.len(),
74 "retrieved, answering pending"
75 );
76 for client_ch in clients {
77 client_ch.send(versions.clone()).unwrap();
78 }
79 } else {
80 event!(
81 Level::ERROR,
82 resource = %resource,
83 "no pending request found"
84 );
85 }
86 }
87 }
88 }
89 });
90 Server { server_ch }
91 }
92
93 #[instrument(level = "debug")]
94 async fn handle_request(
95 worker_ch: mpsc::Sender<Message>,
96 cache: &Cache,
97 pending: &mut Pending,
98 resource: Resource,
99 client_ch: oneshot::Sender<Option<Vec<Version>>>,
100 ) {
101 if let Some(versions) = cache.get(&resource) {
102 event!(Level::INFO, resource = %resource, "cache hit");
103 Server::worker_send(worker_ch, resource, versions.clone()).await;
104 return;
105 }
106 let e = pending.entry(resource.clone()).or_default();
107 if e.is_empty() {
108 event!(Level::INFO, resource = %resource, "downloader task started");
109 tokio::spawn(async move {
110 match resource.get_versions().await {
111 Ok(versions) => {
112 Server::worker_send(worker_ch, resource, Some(versions)).await;
113 }
114 Err(e) => {
115 event!(
116 Level::ERROR,
117 resource = %resource,
118 error = %e,
119 "error in get_version"
120 );
121 Server::worker_send(worker_ch, resource, None).await;
122 }
123 };
124 });
125 } else {
126 event!(
127 Level::INFO,
128 resource = %resource,
129 "downloader task already present"
130 );
131 }
132 e.push(client_ch);
133 }
134
135 #[instrument(level = "debug")]
136 async fn worker_send(
137 worker_ch: mpsc::Sender<Message>,
138 resource: Resource,
139 versions: Option<Vec<Version>>,
140 ) {
141 if let Err(e) = worker_ch
142 .send(Message::Downloaded { resource, versions })
143 .await
144 {
145 event!(
146 Level::ERROR,
147 error = %e,
148 "error sending download to server task"
149 );
150 }
151 }
152
153 #[instrument(level = "debug")]
154 pub fn new_client(&self) -> Client {
155 Client {
156 server_ch: self.server_ch.clone(),
157 }
158 }
159}
160
161impl Default for Server {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl Client {
168 #[instrument(level = "debug")]
169 pub async fn get_versions(&self, resource: &Resource) -> anyhow::Result<Option<Vec<Version>>> {
170 let (client_ch, response) = oneshot::channel();
171 self.server_ch
172 .send(Message::Request {
173 resource: resource.clone(),
174 client_ch,
175 })
176 .await?;
177 Ok(response.await?)
178 }
179
180 #[instrument(level = "debug")]
181 pub async fn fetch_latest_version(
182 &self,
183 resource: &Resource,
184 current_version: &Version,
185 ) -> Option<(Resource, Version)> {
186 let versions = match self.get_versions(resource).await {
187 Ok(versions) => versions.unwrap_or_default(),
188 Err(e) => {
189 event!(
190 Level::ERROR,
191 resource = %resource,
192 error = %e,
193 "error getting version",
194 );
195 return None;
196 }
197 };
198 if versions.is_empty() {
199 event!(
200 Level::ERROR,
201 resource = %resource,
202 versions = ?versions,
203 "no version found",
204 );
205 return None;
206 } else if !versions.contains(current_version) {
207 event!(
208 Level::WARN,
209 resource = %resource,
210 current = %current_version,
211 versions = ?versions,
212 "current version not present in version list",
213 );
214 }
215 let latest = versions.iter().max().unwrap();
216 event!(
217 Level::INFO,
218 resource = %resource,
219 versions = ?versions,
220 latest = %latest,
221 "got versions",
222 );
223 Some((resource.clone(), latest.clone()))
224 }
225}