treadmill_rs/api/
switchboard_supervisor.rs

1//! Types used in the interface between the coordinator and supervisor
2//! components.
3
4use crate::connector::JobError;
5use crate::image::manifest::ImageId;
6use crate::util::chrono::duration as human_duration;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use uuid::Uuid;
10
11pub mod websocket {
12    pub static TREADMILL_WEBSOCKET_PROTOCOL: &str = "treadmillv1";
13    pub static TREADMILL_WEBSOCKET_CONFIG: &str = "tml-socket-config";
14}
15
16// -- (Configuration)  -----------------------------------------------------------------------------
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct SocketConfig {
20    /// PING-PONG keepalive configuration.
21    pub keepalive: KeepaliveConfig,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct KeepaliveConfig {
26    /// How often the switchboard should send PING messages to the supervisor.
27    #[serde(with = "human_duration")]
28    pub ping_interval: chrono::TimeDelta,
29    /// Minimum time without a PONG response before the switchboard closes the connection.
30    #[serde(with = "human_duration")]
31    pub keepalive: chrono::TimeDelta,
32}
33
34// -- StartJobRequest ------------------------------------------------------------------------------
35
36#[derive(Serialize, Deserialize, Clone)]
37pub struct ParameterValue {
38    pub value: String,
39    pub secret: bool,
40}
41
42impl std::fmt::Debug for ParameterValue {
43    /// Custom implementation of [`std::fmt::Debug`] for [`ParameterValue`] to
44    /// avoid leaking secrets in logs:
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
46        let mut debug_struct = f.debug_struct("ParameterValue");
47        debug_struct.field("secret", &self.secret);
48
49        // TODO: Requires nightly feature debug_closure_helpers
50        // debug_struct.field_with("value", |f| {
51        //     if self.secret {
52        //         write!(f, "***")
53        //     } else {
54        //         <String as std::fmt::Debug>::fmt(&self.value, f)
55        //     }
56        // });
57
58        // For now, print the secret as if it were a string (with
59        // quotation marks) with contents "***":
60        debug_struct.field("value", if self.secret { &"***" } else { &self.value });
61
62        debug_struct.finish()
63    }
64}
65
66#[derive(Serialize, Deserialize, Debug, Clone)]
67#[serde(rename_all = "snake_case")]
68pub struct RendezvousServerSpec {
69    pub client_id: Uuid,
70    pub server_base_url: String,
71    pub auth_token: String,
72}
73#[derive(Serialize, Deserialize, Debug, Clone)]
74#[serde(tag = "type")]
75#[serde(rename_all = "snake_case")]
76pub enum ImageSpecification {
77    /// Whether to resume a previously started job.
78    ResumeJob { job_id: Uuid },
79
80    /// Which image to base this job off. If the image is not locally cached
81    /// at the supervisor, it will be fetched using its manifest prior to
82    /// executing the job.
83    ///
84    /// Images are content-addressed by the SHA-256 digest of their
85    /// manifest.
86    ///
87    /// Note that if a job is being restarted, it will use this variant.
88    Image { image_id: ImageId },
89}
90#[derive(Serialize, Deserialize, Debug, Clone)]
91#[serde(rename_all = "snake_case")]
92pub struct RestartPolicy {
93    pub remaining_restart_count: usize,
94}
95#[derive(Serialize, Deserialize, Debug, Clone)]
96#[serde(rename_all = "snake_case")]
97pub struct StartJobMessage {
98    /// Unique identifier of the job to be started.
99    ///
100    /// To restart a previously failed or interrupted job, pass the same ID
101    /// in as the old job and set `resume_job = true`. The supervisor may
102    /// refuse to start a job with a re-used ID if `resume_job` is
103    /// deasserted, and must refuse to start a job when it cannot be resumed
104    /// and `resume_job` is asserted.
105    pub job_id: Uuid,
106
107    pub image_spec: ImageSpecification,
108
109    /// The set of initial SSH keys to deploy onto the image.
110    ///
111    /// The image's configuration of the Treadmill puppet daemon determines
112    /// how and whether these keys will be loaded.
113    pub ssh_keys: Vec<String>,
114
115    pub restart_policy: RestartPolicy,
116
117    /// A set of SSH rendezvous servers to tunnel inbound SSH connections
118    /// through. Leave empty to avoid using SSH rendezvouz
119    /// servers. Supervisors may not support this, in which case they will
120    /// not report back any SSH endpoints reachable through the rendezvous
121    /// endpoints listed here:
122    pub ssh_rendezvous_servers: Vec<RendezvousServerSpec>,
123
124    /// A hash map of parameters provided to this job execution. These
125    /// parameters are provided to the puppet daemon.
126    pub parameters: HashMap<String, ParameterValue>,
127}
128
129// -- StopJobRequest -------------------------------------------------------------------------------
130
131#[derive(Serialize, Deserialize, Debug, Clone)]
132#[serde(rename_all = "snake_case")]
133pub struct StopJobMessage {
134    /// Unique identifier of the job to be stopped:
135    pub job_id: Uuid,
136}
137
138// -- Job/Supervisor Status ------------------------------------------------------------------------
139
140#[derive(Serialize, Deserialize, Debug, Clone)]
141#[serde(rename_all = "snake_case")]
142pub enum JobInitializingStage {
143    /// Generic starting stage, for when no other stage is applicable:
144    Starting,
145
146    /// Fetching the specified image:
147    FetchingImage,
148
149    /// Acquiring resources, such as the root file system, to launch the
150    /// board environment.
151    Allocating,
152
153    /// Provisioning the environment, such as making any changes to the base
154    /// system according to the user-provided customizations.
155    Provisioning,
156
157    /// The host is booting. The next transition should
158    /// either be into the `Ready` or `Failed` states.
159    Booting,
160}
161// #[derive(Serialize, Deserialize, Debug, Clone)]
162// #[serde(rename_all = "snake_case")]
163// pub enum JobSessionConnectionInfo {
164//     #[serde(rename = "direct_ssh")]
165//     DirectSSH {
166//         hostname: String,
167//         port: u16,
168//         host_key_fingerprints: Vec<String>,
169//     },
170//     #[serde(rename = "rendezvous_ssh")]
171//     RendezvousSSH {
172//         hostname: String,
173//         port: u16,
174//         host_key_fingerprints: Vec<String>,
175//     },
176// }
177#[derive(Serialize, Deserialize, Debug, Clone)]
178#[serde(tag = "state")]
179#[serde(rename_all = "snake_case")]
180pub enum RunningJobState {
181    Initializing { stage: JobInitializingStage },
182    Ready,
183    // Ready { connection_info: Vec<JobSessionConnectionInfo>, },
184    Terminating,
185    Terminated,
186}
187#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
188#[serde(rename_all = "snake_case")]
189pub enum JobUserExitStatus {
190    Success,
191    Error,
192    Unknown,
193}
194#[derive(Serialize, Deserialize, Debug, Clone)]
195#[serde(tag = "type")]
196#[serde(rename_all = "snake_case")]
197pub enum SupervisorJobEvent {
198    StateTransition {
199        new_state: RunningJobState,
200        status_message: Option<String>,
201    },
202    DeclareExitStatus {
203        user_exit_status: JobUserExitStatus,
204        host_output: Option<String>,
205    },
206    // Technically a state transition
207    Error {
208        error: JobError,
209    },
210    ConsoleLog {
211        console_bytes: Vec<u8>,
212    },
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(tag = "type")]
217#[serde(rename_all = "snake_case")]
218pub enum ReportedSupervisorStatus {
219    OngoingJob {
220        job_id: Uuid,
221        job_state: RunningJobState,
222    },
223    Idle,
224}
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(tag = "type")]
227#[serde(rename_all = "snake_case")]
228pub enum SupervisorEvent {
229    JobEvent {
230        job_id: Uuid,
231        event: SupervisorJobEvent,
232    },
233}
234
235// -- General Request/Response ---------------------------------------------------------------------
236
237#[derive(Serialize, Deserialize, Debug, Clone)]
238#[serde(rename_all = "snake_case")]
239pub struct Request<T> {
240    pub request_id: Uuid,
241    pub message: T,
242}
243#[derive(Serialize, Deserialize, Debug, Clone)]
244#[serde(rename_all = "snake_case")]
245pub struct Response<T> {
246    pub response_to_request_id: Uuid,
247    pub message: T,
248}
249#[derive(Serialize, Deserialize, Debug, Clone)]
250#[serde(rename_all = "snake_case")]
251#[serde(tag = "type", content = "message")]
252pub enum Message {
253    StartJob(StartJobMessage),
254
255    StopJob(StopJobMessage),
256
257    StatusRequest(Request<()>),
258    StatusResponse(Response<ReportedSupervisorStatus>),
259
260    SupervisorEvent(SupervisorEvent),
261}
262impl Message {
263    pub fn request_id(&self) -> Option<Uuid> {
264        match self {
265            Message::StatusRequest(r) => Some(r.request_id),
266            Message::StartJob(_)
267            | Message::StopJob(_)
268            | Message::StatusResponse(_)
269            | Message::SupervisorEvent(_) => None,
270        }
271    }
272    pub fn to_response_message(self) -> Result<Response<ResponseMessage>, Message> {
273        match self {
274            Message::StatusResponse(Response {
275                response_to_request_id,
276                message,
277            }) => Ok(Response {
278                response_to_request_id,
279                message: ResponseMessage::StatusResponse(message),
280            }),
281            x => Err(x),
282        }
283    }
284}
285#[derive(Debug)]
286#[non_exhaustive]
287pub enum ResponseMessage {
288    StatusResponse(ReportedSupervisorStatus),
289}