1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use anyhow::anyhow;
use chrono::Utc;
use plane_core::{
messages::agent::DroneStatusMessage,
messages::scheduler::{ScheduleRequest, ScheduleResponse},
nats::TypedNats,
NeverResult,
};
use scheduler::Scheduler;
use tokio::select;
mod config;
mod dns;
mod plan;
pub mod run;
mod scheduler;
pub mod ttl_store;
pub async fn run_scheduler(nats: TypedNats) -> NeverResult {
let scheduler = Scheduler::default();
let mut spawn_request_sub = nats.subscribe(ScheduleRequest::subscribe_subject()).await?;
tracing::info!("Subscribed to spawn requests.");
let mut status_sub = nats
.subscribe_jetstream(DroneStatusMessage::subscribe_subject())
.await?;
tracing::info!("Subscribed to drone status messages.");
loop {
select! {
status_msg = status_sub.next() => {
tracing::debug!(?status_msg, "Got drone status");
if let Some(status_msg) = status_msg? {
scheduler.update_status(Utc::now(), &status_msg);
} else {
return Err(anyhow!("status_sub.next() returned None."));
}
},
spawn_request = spawn_request_sub.next() => {
match spawn_request {
Ok(Some(schedule_request)) => {
tracing::info!(spawn_request=?schedule_request.value, "Got spawn request");
let result = match scheduler.schedule(&schedule_request.value.cluster, Utc::now()) {
Ok(drone_id) => {
let spawn_request = schedule_request.value.schedule(&drone_id);
match nats.request(&spawn_request).await {
Ok(false) => {
tracing::warn!("No drone available.");
ScheduleResponse::NoDroneAvailable
},
Err(error) => {
tracing::warn!(?error, "Scheduler returned error.");
ScheduleResponse::NoDroneAvailable
},
Ok(true) => ScheduleResponse::Scheduled { drone: drone_id, backend_id: spawn_request.backend_id }
}
},
Err(error) => {
tracing::warn!(?error, "Communication error during scheduling.");
ScheduleResponse::NoDroneAvailable
},
};
schedule_request.respond(&result).await?;
},
Ok(None) => return Err(anyhow!("spawn_request_sub.next() returned None.")),
Err(err) => tracing::warn!("spawn_request_sub.next() returned error: {:?}", err)
}
}
}
}
}