1use anyhow;
10use std::collections::HashMap;
11use tokio::sync::mpsc;
12use tokio::sync::oneshot;
13use tracing::{event, instrument, Level};
14
15use crate::entity::Entity;
16use crate::updater::updater_for;
17use crate::updater::Updater;
18use crate::version::Version;
19
20#[derive(Debug)]
21pub struct Server {
22 server_ch: mpsc::Sender<Message>,
23}
24
25#[derive(Debug)]
26pub struct Client {
27 server_ch: mpsc::Sender<Message>,
28}
29
30#[derive(Debug)]
31pub enum Message {
32 Request {
33 resource: String,
34 client_ch: oneshot::Sender<Option<Vec<Version>>>,
35 },
36 Downloaded {
37 resource: String,
38 versions: Option<Vec<Version>>,
39 },
40}
41
42type Cache = HashMap<String, Option<Vec<Version>>>;
43type Pending = HashMap<String, Vec<oneshot::Sender<Option<Vec<Version>>>>>;
44
45impl Server {
46 #[instrument(level = "debug")]
47 pub fn new() -> Server {
48 let (server_ch, mut queue): (mpsc::Sender<Message>, mpsc::Receiver<Message>) =
49 mpsc::channel(32);
50 let worker_ch = server_ch.clone();
51 tokio::spawn(async move {
52 event!(Level::INFO, "Server task started");
53 let mut pending: Pending = Default::default();
54 let mut cache: Cache = Default::default();
55 while let Some(msg) = queue.recv().await {
56 match msg {
57 Message::Request {
58 resource,
59 client_ch,
60 } => {
61 Server::handle_request(
62 worker_ch.clone(),
63 &cache,
64 &mut pending,
65 resource,
66 client_ch,
67 )
68 .await
69 }
70 Message::Downloaded { resource, versions } => {
71 cache.insert(resource.clone(), versions.clone());
72 if let Some(clients) = pending.remove(&resource) {
73 event!(
74 Level::INFO,
75 resource = resource,
76 num_clients = clients.len(),
77 "retrieved, answering pending"
78 );
79 for client_ch in clients {
80 client_ch.send(versions.clone()).unwrap();
81 }
82 } else {
83 event!(
84 Level::ERROR,
85 resource = resource,
86 "no pending request found"
87 );
88 }
89 }
90 }
91 }
92 });
93 Server { server_ch }
94 }
95
96 #[instrument(level = "debug")]
97 async fn handle_request(
98 worker_ch: mpsc::Sender<Message>,
99 cache: &Cache,
100 pending: &mut Pending,
101 resource: String,
102 client_ch: oneshot::Sender<Option<Vec<Version>>>,
103 ) {
104 if let Some(versions) = cache.get(&resource) {
105 event!(Level::INFO, resource = resource, "cache hit");
106 Server::worker_send(worker_ch, resource, versions.clone()).await;
107 return;
108 }
109 let updater = match updater_for(&resource) {
110 Ok(updater) => updater,
111 Err(e) => {
112 event!(
113 Level::ERROR,
114 resource = resource,
115 error = %e,
116 "error getting updater",
117 );
118 return;
119 }
120 };
121 let url = match updater.url(&resource) {
122 Some(url) => url,
123 None => {
124 event!(
125 Level::ERROR,
126 resource = resource,
127 updater = ?updater,
128 "updater could not parse url",
129 );
130 return;
131 }
132 };
133 let e = pending.entry(resource.clone()).or_default();
134 if e.is_empty() {
135 event!(Level::INFO, resource = resource, "downloader task started");
136 tokio::spawn(async move {
137 match updater.get_versions(&url).await {
138 Ok(versions) => {
139 Server::worker_send(worker_ch, resource, Some(versions)).await;
140 }
141 Err(e) => {
142 event!(
143 Level::ERROR,
144 resource = resource,
145 updater = ?updater,
146 error = %e,
147 "error in get_version"
148 );
149 Server::worker_send(worker_ch, resource, None).await;
150 }
151 };
152 });
153 } else {
154 event!(
155 Level::INFO,
156 resource = resource,
157 "downloader task already present"
158 );
159 }
160 e.push(client_ch);
161 }
162
163 #[instrument(level = "debug")]
164 async fn worker_send(
165 worker_ch: mpsc::Sender<Message>,
166 resource: String,
167 versions: Option<Vec<Version>>,
168 ) {
169 if let Err(e) = worker_ch
170 .send(Message::Downloaded { resource, versions })
171 .await
172 {
173 event!(
174 Level::ERROR,
175 error = %e,
176 "error sending download to server task"
177 );
178 }
179 }
180
181 #[instrument(level = "debug")]
182 pub fn new_client(&self) -> Client {
183 Client {
184 server_ch: self.server_ch.clone(),
185 }
186 }
187}
188
189impl Default for Server {
190 fn default() -> Self {
191 Self::new()
192 }
193}
194
195impl Client {
196 #[instrument(level = "debug")]
197 pub async fn get_versions(&self, resource: &str) -> anyhow::Result<Option<Vec<Version>>> {
198 let (client_ch, response) = oneshot::channel();
199 self.server_ch
200 .send(Message::Request {
201 resource: resource.to_owned(),
202 client_ch,
203 })
204 .await?;
205 Ok(response.await?)
206 }
207
208 #[instrument(level = "debug")]
209 pub async fn resolve_entity(&self, mut entity: Entity) -> Entity {
210 let versions = match self.get_versions(&entity.resource).await {
211 Ok(versions) => versions.unwrap_or_default(),
212 Err(e) => {
213 event!(
214 Level::ERROR,
215 resource = entity.resource,
216 error = %e,
217 "error getting version",
218 );
219 return entity;
220 }
221 };
222 if versions.is_empty() {
223 event!(
224 Level::ERROR,
225 resource = entity.resource,
226 versions = ?versions,
227 "no version found",
228 );
229 return entity;
230 } else if !versions.contains(&entity.version) {
231 event!(
232 Level::WARN,
233 resource = entity.resource,
234 current = %entity.version,
235 versions = ?versions,
236 "current version not present in version list",
237 );
238 }
239 let latest = versions.iter().max().unwrap();
240 event!(
241 Level::INFO,
242 resource = entity.resource,
243 versions = ?versions,
244 latest = %latest,
245 "got versions",
246 );
247 entity.latest = Some(latest.clone());
248 if let Ok(updater) = updater_for(&entity.resource) {
249 entity.updated_line = updater.updated_line(&entity);
250 }
251 entity
252 }
253}