1use chrono::{DateTime, Utc};
2use strum::{Display, EnumString};
3
4use boulder::{BuildableWithPersianRug, GeneratableWithPersianRug};
5use boulder::{Inc, Pattern};
6use django_query::filtering::{ops::Scalar, FilterableWithPersianRug};
7use django_query::{row::IntoRowWithPersianRug, sorting::SortableWithPersianRug};
8
9use persian_rug::{contextual, Context};
10
11#[derive(
13 Clone,
14 Debug,
15 IntoRowWithPersianRug,
16 FilterableWithPersianRug,
17 SortableWithPersianRug,
18 BuildableWithPersianRug,
19 GeneratableWithPersianRug,
20)]
21#[boulder(persian_rug(context=C, access(Worker<C>)))]
22#[django(persian_rug(context=C, access(Worker<C>)))]
23#[contextual(C)]
24pub struct Worker<C: Context + 'static> {
25 #[django(exclude)]
26 _marker: core::marker::PhantomData<C>,
27 #[boulder(generator=Pattern!("a-test-worker-{}", Inc(1)))]
28 #[django(sort, op(in, contains, icontains, startswith, endswith))]
29 pub hostname: String,
30 #[boulder(default=Some("A test worker".to_string()))]
31 #[django(sort, op(in, contains, icontains, startswith, endswith))]
32 pub description: Option<String>,
33 #[boulder(default=DateTime::parse_from_rfc3339("2022-03-17T17:00:00-00:00").unwrap().with_timezone(&Utc))]
34 #[django(sort, op(lt, gt))]
35 pub last_ping: Option<DateTime<Utc>>,
36 #[boulder(default=State::Online)]
37 #[django(sort)]
38 pub state: State,
39 #[boulder(default=Health::Active)]
40 #[django(sort)]
41 pub health: Health,
42 #[boulder(default = 100)]
43 #[django(unfilterable)]
44 pub job_limit: i64,
45 #[boulder(default=Some("1.0".to_string()))]
46 #[django(unfilterable)]
47 pub version: Option<String>,
48 #[boulder(default=Some("1.0".to_string()))]
49 #[django(unfilterable)]
50 pub master_version_notified: Option<String>,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Display, EnumString)]
55pub enum Health {
56 Active,
57 Maintenance,
58 Retired,
59}
60
61impl Scalar for Health {}
62impl django_query::row::StringCellValue for Health {}
63
64#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Display, EnumString)]
66pub enum State {
67 Online,
68 Offline,
69}
70
71impl Scalar for State {}
72impl django_query::row::StringCellValue for State {}
73
74#[cfg(test)]
75mod test {
76 use super::*;
77 use crate::SharedState;
78
79 use anyhow::Result;
80 use boulder::BuilderWithPersianRug;
81 use boulder::GeneratorWithPersianRugIterator;
82 use boulder::{Repeat, Some as GSome, Time};
83 use chrono::{DateTime, Duration, Utc};
84 use persian_rug::Proxy;
85 use serde_json::json;
86 use test_log::test;
87
88 async fn make_request<T, U>(server_uri: T, endpoint: U) -> Result<serde_json::Value>
89 where
90 T: AsRef<str>,
91 U: AsRef<str>,
92 {
93 let url = format!("{}/api/v0.2/{}", server_uri.as_ref(), endpoint.as_ref());
94 Ok(reqwest::get(&url).await?.json().await?)
95 }
96
97 #[tokio::test]
98 async fn test_workers() {
99 let mut p = SharedState::new();
100 {
101 let mut m = p.mutate();
102
103 m.add(Worker {
104 _marker: Default::default(),
105 hostname: "test2".to_string(),
106 health: Health::Active,
107 state: State::Online,
108 description: Some("description of worker".to_string()),
109 last_ping: Some(Utc::now()),
110 job_limit: 0,
111 version: None,
112 master_version_notified: None,
113 });
114
115 m.add(Worker {
116 _marker: Default::default(),
117 hostname: "test1".to_string(),
118 health: Health::Maintenance,
119 state: State::Offline,
120 description: Some("description of worker".to_string()),
121 last_ping: Some(Utc::now() - chrono::Duration::seconds(10)),
122 job_limit: 0,
123 version: None,
124 master_version_notified: None,
125 });
126 }
127
128 let server = wiremock::MockServer::start().await;
129
130 wiremock::Mock::given(wiremock::matchers::method("GET"))
131 .and(wiremock::matchers::path("/api/v0.2/workers/"))
132 .respond_with(p.endpoint::<Worker<_>>(Some(&server.uri()), None))
133 .mount(&server)
134 .await;
135
136 let workers = make_request(server.uri(), "workers/")
137 .await
138 .expect("failed to query workers");
139
140 assert_eq!(workers["results"][0]["hostname"], json!("test2"));
141 assert_eq!(workers["results"][1]["hostname"], json!("test1"));
142 assert_eq!(workers["results"].as_array().unwrap().len(), 2);
143 }
144
145 #[tokio::test]
146 async fn test_worker_builder() {
147 let mut p = SharedState::new();
148 {
149 let m = p.mutate();
150
151 let (w, mut m) = Worker::builder().hostname("test2").build(m);
152 m.add(w);
153 let (w, mut m) = Worker::builder().hostname("test1").build(m);
154 m.add(w);
155 }
156
157 let server = wiremock::MockServer::start().await;
158
159 wiremock::Mock::given(wiremock::matchers::method("GET"))
160 .and(wiremock::matchers::path("/api/v0.2/workers/"))
161 .respond_with(p.endpoint::<Worker<_>>(Some(&server.uri()), None))
162 .mount(&server)
163 .await;
164
165 let workers = make_request(server.uri(), "workers/")
166 .await
167 .expect("failed to query workers");
168
169 assert_eq!(workers["results"][0]["hostname"], json!("test2"));
170 assert_eq!(workers["results"][1]["hostname"], json!("test1"));
171 assert_eq!(workers["results"].as_array().unwrap().len(), 2);
172 }
173
174 #[tokio::test]
175 async fn test_worker_stream() {
176 let mut p = SharedState::new();
177 {
178 let m = p.mutate();
179
180 let _ = GeneratorWithPersianRugIterator::new(
181 Proxy::<Worker<crate::state::State>>::generator()
182 .state(Repeat!(State::Offline, State::Online)),
183 m,
184 )
185 .take(2)
186 .collect::<Vec<_>>();
187 }
188
189 let server = wiremock::MockServer::start().await;
190
191 wiremock::Mock::given(wiremock::matchers::method("GET"))
192 .and(wiremock::matchers::path("/api/v0.2/workers/"))
193 .respond_with(p.endpoint::<Worker<_>>(Some(&server.uri()), None))
194 .mount(&server)
195 .await;
196
197 let workers = make_request(server.uri(), "workers/")
198 .await
199 .expect("failed to query workers");
200
201 assert_eq!(workers["results"][0]["hostname"], json!("a-test-worker-1"));
202 assert_eq!(workers["results"][1]["hostname"], json!("a-test-worker-2"));
203 assert_eq!(workers["results"].as_array().unwrap().len(), 2);
204 }
205
206 #[test(tokio::test)]
207 async fn test_output() {
208 let mut p = SharedState::new();
209 {
210 let m = p.mutate();
211
212 let gen = Proxy::<Worker<crate::state::State>>::generator()
213 .state(Repeat!(State::Online, State::Offline))
214 .health(Repeat!(Health::Active, Health::Retired))
215 .last_ping(GSome(Time::new(
216 DateTime::parse_from_rfc3339("2022-04-11T21:00:00-00:00")
217 .unwrap()
218 .with_timezone(&Utc),
219 Duration::minutes(30),
220 )))
221 .job_limit(|| 0)
222 .master_version_notified(|| None);
223
224 let _ = GeneratorWithPersianRugIterator::new(gen, m)
225 .take(5)
226 .collect::<Vec<_>>();
227 }
228
229 let server = wiremock::MockServer::start().await;
230
231 let ep = p.endpoint::<Worker<_>>(Some(&server.uri()), None);
232
233 wiremock::Mock::given(wiremock::matchers::method("GET"))
234 .and(wiremock::matchers::path("/api/v0.2/workers/"))
235 .respond_with(ep)
236 .mount(&server)
237 .await;
238
239 let body = make_request(server.uri(), "workers/?limit=2&offset=1")
240 .await
241 .expect("failed to query workers");
242
243 let next = format!("{}/api/v0.2/workers/?limit=2&offset=3", server.uri());
244 let prev = format!("{}/api/v0.2/workers/?limit=2", server.uri());
245
246 assert_eq!(
247 body,
248 serde_json::json! {
249 {
250 "count": 5,
251 "next": next,
252 "previous": prev,
253 "results": [
254 {
255 "hostname": "a-test-worker-2",
256 "health": "Retired",
257 "state": "Offline",
258 "description": "A test worker",
259 "last_ping": "2022-04-11T21:30:00.000000Z",
260 "job_limit": 0,
261 "version": "1.0",
262 "master_version_notified": null
263 },
264 {
265 "hostname": "a-test-worker-3",
266 "health": "Active",
267 "state": "Online",
268 "description": "A test worker",
269 "last_ping": "2022-04-11T22:00:00.000000Z",
270 "job_limit": 0,
271 "version": "1.0",
272 "master_version_notified": null
273 }
274 ]
275 }
276 }
277 );
278 }
279}