treadmill_rs/
control_socket.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use tracing::{Level, event};
5use uuid::Uuid;
6
7use crate::api::supervisor_puppet::{self, JobInfo, PuppetEvent, PuppetReq, SupervisorResp};
8
9/// Supervisor interface for control socket servers.
10///
11/// A puppet connects to a supervisor through a control socket (e.g., TCP or
12/// Unix SeqPacket). These control socket servers will, in turn, deliver
13/// requests and events to supervisors. This trait is implemented by supervisors
14/// to facilitate the delivery of these events.
15#[async_trait]
16pub trait Supervisor: Send + Sync + 'static {
17    /// Puppet SSH keys request.
18    ///
19    /// If the supervisor maintains a set of SSH authorized keys that should be
20    /// deployed on the puppet, it should return with `Some(<authorized keys
21    /// set>)`.
22    ///
23    /// When a supervisor returns `None`, it indicates that this request is not
24    /// supported. A puppet may or may not retry sending SSH keys requests at a
25    /// later point in time when the supervisor ever returned `None`. If a
26    /// supervisor is able to provide SSH authorized keys generally, but
27    /// currently has none configured, it should instead return `Some(vec![])`.
28    async fn ssh_keys(&self, host_id: Uuid, job_id: Uuid) -> Option<Vec<String>>;
29
30    /// Puppet network configuration request.
31    ///
32    /// Hosts may obtain their network configuration through various means, such
33    /// as using DHCP, IPv6 RA, or through static configuration. However, this
34    /// request endpoint can be used by puppets that do not have another means
35    /// to obtain their network configuration, and where a network connection is
36    /// not required to connect to the puppet control socket (such as with
37    /// UnixSeqpacket for Linux containers or VirtIO channels in a QEMU VM).
38    ///
39    /// If this endpoint is supported by the supervisor, it must at least supply
40    /// the host's hostname, and may optionally provide IPv4 and IPv6 addresses,
41    /// gateways, and a DNS server.
42    async fn network_config(
43        &self,
44        host_id: Uuid,
45        job_id: Uuid,
46    ) -> Option<supervisor_puppet::NetworkConfig>;
47
48    /// Puppet job parameters request.
49    ///
50    /// If the supervisor deems that this job is currently active, it should
51    /// respond with the full set of parameters supplied by the coordinator.
52    ///
53    /// Returning `None` implies that this job id is currently not active. If a
54    /// job has no parameters defined, the `parameters` field should be an empty
55    /// `HashMap`.
56    async fn parameters(
57        &self,
58        host_id: Uuid,
59        job_id: Uuid,
60    ) -> Option<HashMap<String, supervisor_puppet::ParameterValue>>;
61
62    /// Generic request handler.
63    ///
64    /// The default implementation of this method calls out to the other methods
65    /// of this trait and should normally not need to be overriden. This method
66    /// is to be used by control socket server implementations.
67    async fn handle_request(
68        &self,
69        _request_id: u64,
70        req: PuppetReq,
71        host_id: Uuid,
72        job_id: Uuid,
73    ) -> SupervisorResp {
74        match req {
75            PuppetReq::Ping => SupervisorResp::PingResp,
76
77            PuppetReq::JobInfo => SupervisorResp::JobInfo(JobInfo { job_id, host_id }),
78
79            PuppetReq::SSHKeys => SupervisorResp::SSHKeysResp {
80                ssh_keys: self
81                    .ssh_keys(host_id, job_id)
82                    .await
83                    .unwrap_or_else(std::vec::Vec::new),
84            },
85
86            PuppetReq::Parameters => self
87                .parameters(host_id, job_id)
88                .await
89                .map_or(SupervisorResp::JobNotFound, |parameters| {
90                    SupervisorResp::Parameters { parameters }
91                }),
92
93            PuppetReq::NetworkConfig => self
94                .network_config(host_id, job_id)
95                .await
96                .map_or(SupervisorResp::JobNotFound, SupervisorResp::NetworkConfig),
97            // Would be required for consumers of this type outside of this
98            // crate, as it's marked with `#[non_exhaustive]`. However here we
99            // can exhaustively list all request types and force compile-errors
100            // if we add some in the future.
101            //
102            // _ => SupervisorResp::UnsupportedRequest,
103        }
104    }
105
106    async fn handle_event(
107        &self,
108        puppet_event_id: u64,
109        event: PuppetEvent,
110        host_id: Uuid,
111        job_id: Uuid,
112    ) {
113        match event {
114            PuppetEvent::Ready => self.puppet_ready(puppet_event_id, host_id, job_id).await,
115
116            PuppetEvent::Shutdown {
117                supervisor_event_id,
118            } => {
119                self.puppet_shutdown(puppet_event_id, supervisor_event_id, host_id, job_id)
120                    .await
121            }
122
123            PuppetEvent::Reboot {
124                supervisor_event_id,
125            } => {
126                self.puppet_reboot(puppet_event_id, supervisor_event_id, host_id, job_id)
127                    .await
128            }
129
130            PuppetEvent::TerminateJob {
131                supervisor_event_id,
132            } => {
133                self.terminate_job(puppet_event_id, supervisor_event_id, host_id, job_id)
134                    .await
135            }
136
137            PuppetEvent::RunCommandError { .. }
138            | PuppetEvent::RunCommandOutput { .. }
139            | PuppetEvent::RunCommandExitCode { .. } => {
140                event!(
141                    Level::WARN,
142                    ?job_id,
143                    puppet_event_id,
144                    "Received unhandled puppet event: {:?}",
145                    event,
146                )
147            } // Would be required for consumers of this type outside of this
148              // crate, as it's marked with `#[non_exhaustive]`. However here we
149              // can exhaustively list all event types and force compile-errors
150              // if we add some in the future.
151              //
152              // _ => warn!(Unhandled puppet event!),
153        }
154    }
155
156    async fn puppet_ready(&self, puppet_event_id: u64, host_id: Uuid, job_id: Uuid);
157    async fn puppet_shutdown(
158        &self,
159        puppet_event_id: u64,
160        supervisor_event_id: Option<u64>,
161        host_id: Uuid,
162        job_id: Uuid,
163    );
164    async fn puppet_reboot(
165        &self,
166        puppet_event_id: u64,
167        supervisor_event_id: Option<u64>,
168        host_id: Uuid,
169        job_id: Uuid,
170    );
171
172    /// Puppet requests job to be terminated.
173    async fn terminate_job(
174        &self,
175        puppet_event_id: u64,
176        supervisor_event_id: Option<u64>,
177        host_id: Uuid,
178        job_id: Uuid,
179    );
180}