1use boulder::{BuildableWithPersianRug, GeneratableWithPersianRug};
2use boulder::{Inc, Some as GSome, Time};
3use chrono::{DateTime, Duration, Utc};
4use django_query::{
5 filtering::FilterableWithPersianRug, row::IntoRowWithPersianRug,
6 sorting::SortableWithPersianRug,
7};
8use persian_rug::{contextual, Context, Proxy};
9use strum::{Display, EnumString};
10
11use crate::devices::Device;
12use crate::devicetypes::{Alias, Architecture, BitWidth, Core, DeviceType, ProcessorFamily};
13use crate::tags::Tag;
14use crate::users::{Group, User};
15use crate::workers::Worker;
16
17#[derive(
25 Clone,
26 Debug,
27 FilterableWithPersianRug,
28 SortableWithPersianRug,
29 IntoRowWithPersianRug,
30 BuildableWithPersianRug,
31 GeneratableWithPersianRug,
32)]
33#[django(
34 persian_rug(
35 context=C,
36 access(
37 Alias<C>,
38 Architecture<C>,
39 BitWidth<C>,
40 Core<C>,
41 Device<C>,
42 DeviceType<C>,
43 Group<C>,
44 Job<C>,
45 ProcessorFamily<C>,
46 Tag<C>,
47 User<C>,
48 Worker<C>
49 )
50 )
51)]
52#[boulder(
53 persian_rug(
54 context=C,
55 access(
56 Alias<C>,
57 Architecture<C>,
58 BitWidth<C>,
59 Core<C>,
60 Device<C>,
61 DeviceType<C>,
62 Group<C>,
63 Job<C>,
64 ProcessorFamily<C>,
65 Tag<C>,
66 User<C>,
67 Worker<C>
68 )
69 )
70)]
71#[contextual(C)]
72pub struct Job<C: Context + 'static> {
73 #[boulder(generator=Inc(0))]
74 #[django(op(lt, gt, in), sort)]
75 pub id: i64,
76 #[boulder(buildable_with_persian_rug, generatable_with_persian_rug)]
77 #[django(traverse, foreign_key = "username")]
78 pub submitter: Proxy<User<C>>,
79 #[boulder(generatable_with_persian_rug, sequence = 3usize)]
80 #[django(traverse, foreign_key = "id")]
81 pub viewing_groups: Vec<Proxy<Group<C>>>,
82 #[boulder(default = "Example job description")]
84 #[django(op(in, contains, icontains, startswith, endswith))]
85 pub description: String,
86 #[boulder(default = true)]
87 pub health_check: bool,
88 #[boulder(buildable_with_persian_rug, generatable_with_persian_rug)]
89 #[django(traverse, foreign_key = "name")]
90 pub requested_device_type: Option<Proxy<DeviceType<C>>>,
91 #[boulder(generatable_with_persian_rug, sequence = 4usize)]
92 #[django(traverse, foreign_key = "id")]
93 pub tags: Vec<Proxy<Tag<C>>>,
94 #[boulder(buildable_with_persian_rug, generatable_with_persian_rug)]
95 #[django(traverse, foreign_key = "hostname")]
96 pub actual_device: Option<Proxy<Device<C>>>,
97 #[boulder(default=Some(DateTime::parse_from_rfc3339("2022-03-17T17:00:00-00:00").unwrap().with_timezone(&Utc)),
98 generator=GSome(Time::new(DateTime::parse_from_rfc3339("2022-03-17T17:00:00-00:00").unwrap().with_timezone(&Utc),
99 Duration::minutes(1))))]
100 #[django(op(gt, lt, isnull), sort)]
101 pub submit_time: Option<DateTime<Utc>>,
102 #[django(op(gt, lt, isnull), sort)]
103 pub start_time: Option<DateTime<Utc>>,
104 #[django(op(gt, lt, isnull), sort)]
105 pub end_time: Option<DateTime<Utc>>,
106 #[boulder(default=State::Submitted)]
107 #[django(op(iexact, in))]
108 pub state: State,
109 #[boulder(default=Health::Unknown)]
110 #[django(op(iexact, in))]
111 pub health: Health,
112 #[django(op(in, lt, gt, lte, gte))]
113 pub priority: i64,
114 #[boulder(default = "Example job definition")]
115 #[django(op(in, contains, icontains, startswith, endswith))]
116 pub definition: String,
117 #[boulder(default = "Example job original definition")]
118 #[django(op(in, contains, icontains, startswith, endswith))]
119 pub original_definition: String,
120 #[boulder(default = "Example job multinode definition")]
121 #[django(op(in, contains, icontains, startswith, endswith))]
122 pub multinode_definition: String,
123 #[django(traverse, foreign_key = "id")]
124 pub failure_tags: Vec<Proxy<Tag<C>>>,
125 #[django(op(in, contains, icontains, startswith, endswith, isnull))]
126 pub failure_comment: Option<String>,
127}
128
129#[derive(Copy, Clone, Debug, PartialEq, Eq, EnumString, Display)]
131pub enum Health {
132 Unknown,
133 Complete,
134 Incomplete,
135 Canceled,
136}
137
138impl django_query::filtering::ops::Scalar for Health {}
139impl django_query::row::StringCellValue for Health {}
140
141#[derive(Copy, Clone, Debug, PartialEq, Eq, EnumString, Display)]
143pub enum State {
144 Submitted,
145 Scheduling,
146 Scheduled,
147 Running,
148 Canceling,
149 Finished,
150}
151
152impl django_query::filtering::ops::Scalar for State {}
153impl django_query::row::StringCellValue for State {}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 use anyhow::Result;
160 use boulder::Repeat;
161 use boulder::{
162 BuildableWithPersianRug, BuilderWithPersianRug, GeneratorWithPersianRugIterator,
163 };
164 use serde_json::{json, Value};
165 use test_log::test;
166
167 async fn make_request<T, U>(server_uri: T, endpoint: U) -> Result<Value>
168 where
169 T: AsRef<str>,
170 U: AsRef<str>,
171 {
172 let url = format!("{}/api/v0.2/{}", server_uri.as_ref(), endpoint.as_ref());
173 Ok(reqwest::get(&url).await?.json().await?)
174 }
175
176 #[tokio::test]
177 async fn test_jobs() {
178 let mut p = crate::state::SharedState::new();
179 {
180 let m = p.mutate();
181
182 let (submitter, m) = Proxy::<User<_>>::builder().username("fred").build(m);
183 let (device_type, mut m) = Proxy::<DeviceType<_>>::builder().name("big one").build(m);
184 m.add(Job {
185 id: 1,
186 submitter,
187 viewing_groups: Vec::new(),
188 description: "A job submitted by Fred".to_string(),
189 health_check: false,
190 requested_device_type: Some(device_type),
191 tags: Vec::new(),
192 actual_device: None,
193 submit_time: Some(Utc::now()),
194 start_time: None,
195 end_time: None,
196 state: State::Scheduled,
197 health: Health::Unknown,
198 priority: 1,
199 definition: "/bin/some_stuff".to_string(),
200 original_definition: "/usr/bin/other_stuff".to_string(),
201 multinode_definition: String::new(),
202 failure_tags: Vec::new(),
203 failure_comment: None,
204 });
205
206 let (submitter, m) = Proxy::<User<_>>::builder().username("jane").build(m);
207 let (device_type, mut m) = Proxy::<DeviceType<_>>::builder().name("anything").build(m);
208 m.add(Job {
209 id: 2,
210 submitter,
211 viewing_groups: Vec::new(),
212 description: "A job submitted by Jane".to_string(),
213 health_check: false,
214 requested_device_type: Some(device_type),
215 tags: Vec::new(),
216 actual_device: None,
217 submit_time: Some(Utc::now()),
218 start_time: None,
219 end_time: None,
220 state: State::Submitted,
221 health: Health::Incomplete,
222 priority: 1,
223 definition: "/bin/some_stuff".to_string(),
224 original_definition: "/usr/bin/other_stuff".to_string(),
225 multinode_definition: String::new(),
226 failure_tags: Vec::new(),
227 failure_comment: None,
228 });
229 }
230
231 let server = wiremock::MockServer::start().await;
232
233 wiremock::Mock::given(wiremock::matchers::method("GET"))
234 .and(wiremock::matchers::path("/api/v0.2/jobs/"))
235 .respond_with(p.endpoint::<Job<_>>(Some(&server.uri()), None))
236 .mount(&server)
237 .await;
238
239 let jobs = make_request(server.uri(), "jobs/")
240 .await
241 .expect("failed to query jobs");
242
243 assert_eq!(jobs["results"][0]["id"], json!(1));
244 assert_eq!(jobs["results"][1]["id"], json!(2));
245 assert_eq!(jobs["results"].as_array().unwrap().len(), 2);
246 }
247
248 #[tokio::test]
249 async fn test_job_builder() {
250 let mut p = crate::state::SharedState::new();
251 {
252 let m = p.mutate();
253
254 let (submitter, m) = Proxy::<User<_>>::builder().username("fred").build(m);
255 let (device_type, m) = Proxy::<DeviceType<_>>::builder().name("big one").build(m);
256 let (job, mut m) = Job::builder()
257 .id(1)
258 .submitter(submitter)
259 .requested_device_type(device_type)
260 .state(State::Scheduled)
261 .start_time(None)
262 .build(m);
263 m.add(job);
264
265 let (submitter, m) = Proxy::<User<_>>::builder().username("jane").build(m);
266 let (device_type, m) = Proxy::<DeviceType<_>>::builder().name("anything").build(m);
267 let (job, mut m) = Job::builder()
268 .id(2)
269 .submitter(submitter)
270 .requested_device_type(device_type)
271 .state(State::Submitted)
272 .start_time(None)
273 .build(m);
274 m.add(job);
275 }
276
277 let server = wiremock::MockServer::start().await;
278
279 wiremock::Mock::given(wiremock::matchers::method("GET"))
280 .and(wiremock::matchers::path("/api/v0.2/jobs/"))
281 .respond_with(p.endpoint::<Job<_>>(Some(&server.uri()), None))
282 .mount(&server)
283 .await;
284
285 let jobs = make_request(server.uri(), "jobs/")
286 .await
287 .expect("failed to query jobs");
288
289 assert_eq!(jobs["results"][0]["id"], json!(1));
290 assert_eq!(jobs["results"][1]["id"], json!(2));
291 assert_eq!(jobs["results"].as_array().unwrap().len(), 2);
292 }
293
294 #[tokio::test]
295 async fn test_job_stream() {
296 let mut p = crate::state::SharedState::new();
297 {
298 let m = p.mutate();
299 let (user1, m) = Proxy::<User<_>>::builder().username("fred").build(m);
300 let (user2, m) = Proxy::<User<_>>::builder().username("jane").build(m);
301 let _ = GeneratorWithPersianRugIterator::new(
302 Proxy::<Job<crate::state::State>>::generator().submitter(Repeat!(user1, user2)),
303 m,
304 )
305 .take(2)
306 .collect::<Vec<_>>();
307 }
308
309 let server = wiremock::MockServer::start().await;
310
311 wiremock::Mock::given(wiremock::matchers::method("GET"))
312 .and(wiremock::matchers::path("/api/v0.2/jobs/"))
313 .respond_with(p.endpoint::<Job<_>>(Some(&server.uri()), None))
314 .mount(&server)
315 .await;
316
317 let jobs = make_request(server.uri(), "jobs/")
318 .await
319 .expect("failed to query jobs");
320
321 assert_eq!(jobs["results"][0]["id"], json!(0));
322 assert_eq!(jobs["results"][1]["id"], json!(1));
323 assert_eq!(jobs["results"].as_array().unwrap().len(), 2);
324 }
325
326 #[test(tokio::test)]
327 async fn test_output() {
328 let mut p = crate::state::SharedState::new();
329 {
330 let m = p.mutate();
331
332 let gen = Proxy::<Job<crate::state::State>>::generator()
333 .state(|| State::Finished)
334 .health(|| Health::Complete)
335 .submit_time(GSome(Time::new(
336 DateTime::parse_from_rfc3339("2022-04-11T05:00:00-00:00")
337 .unwrap()
338 .with_timezone(&Utc),
339 Duration::minutes(5),
340 )))
341 .start_time(GSome(Time::new(
342 DateTime::parse_from_rfc3339("2022-04-11T05:30:00-00:00")
343 .unwrap()
344 .with_timezone(&Utc),
345 Duration::minutes(5),
346 )))
347 .end_time(GSome(Time::new(
348 DateTime::parse_from_rfc3339("2022-04-11T06:00:00-00:00")
349 .unwrap()
350 .with_timezone(&Utc),
351 Duration::minutes(5),
352 )))
353 .tags(Vec::new)
354 .viewing_groups(Vec::new)
355 .multinode_definition(String::new)
356 .health_check(Repeat!(false, true))
357 .priority(Repeat!(0, 50));
358
359 let _ = GeneratorWithPersianRugIterator::new(gen, m)
360 .take(4)
361 .collect::<Vec<_>>();
362 }
363
364 let server = wiremock::MockServer::start().await;
365
366 wiremock::Mock::given(wiremock::matchers::method("GET"))
367 .and(wiremock::matchers::path("/api/v0.2/jobs/"))
368 .respond_with(p.endpoint::<Job<_>>(Some(&server.uri()), None))
369 .mount(&server)
370 .await;
371
372 let body = make_request(server.uri(), "jobs/?limit=2")
373 .await
374 .expect("failed to query jobs");
375
376 let next = format!("{}/api/v0.2/jobs/?limit=2&offset=2", server.uri());
377
378 assert_eq!(
379 body,
380 serde_json::json! {
381 {
382 "count": 4,
383 "next": next,
384 "previous": null,
385 "results": [
386 {
387 "id": 0,
388 "submitter": "test-user-1",
389 "viewing_groups": [
390
391 ],
392 "description": "Example job description",
393 "health_check": false,
394 "requested_device_type": "test-device-type-0",
395 "tags": [
396
397 ],
398 "actual_device": "test-device-0",
399 "submit_time": "2022-04-11T05:00:00.000000Z",
400 "start_time": "2022-04-11T05:30:00.000000Z",
401 "end_time": "2022-04-11T06:00:00.000000Z",
402 "state": "Finished",
403 "health": "Complete",
404 "priority": 0,
405 "definition": "Example job definition",
406 "original_definition": "Example job original definition",
407 "multinode_definition": "",
408 "failure_tags": [
409
410 ],
411 "failure_comment": null
412 },
413 {
414 "id": 1,
415 "submitter": "test-user-2",
416 "viewing_groups": [
417
418 ],
419 "description": "Example job description",
420 "health_check": true,
421 "requested_device_type": "test-device-type-1",
422 "tags": [
423
424 ],
425 "actual_device": "test-device-1",
426 "submit_time": "2022-04-11T05:05:00.000000Z",
427 "start_time": "2022-04-11T05:35:00.000000Z",
428 "end_time": "2022-04-11T06:05:00.000000Z",
429 "state": "Finished",
430 "health": "Complete",
431 "priority": 50,
432 "definition": "Example job definition",
433 "original_definition": "Example job original definition",
434 "multinode_definition": "",
435 "failure_tags": [
436
437 ],
438 "failure_comment": null
439 },
440 ]
441 }
442 }
443 );
444 }
445}