treadmill_rs/control_socket.rs
1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use tracing::{Level, event};
5use uuid::Uuid;
6
7use crate::api::supervisor_puppet::{self, JobInfo, PuppetEvent, PuppetReq, SupervisorResp};
8
9/// Supervisor interface for control socket servers.
10///
11/// A puppet connects to a supervisor through a control socket (e.g., TCP or
12/// Unix SeqPacket). These control socket servers will, in turn, deliver
13/// requests and events to supervisors. This trait is implemented by supervisors
14/// to facilitate the delivery of these events.
15#[async_trait]
16pub trait Supervisor: Send + Sync + 'static {
17 /// Puppet SSH keys request.
18 ///
19 /// If the supervisor maintains a set of SSH authorized keys that should be
20 /// deployed on the puppet, it should return with `Some(<authorized keys
21 /// set>)`.
22 ///
23 /// When a supervisor returns `None`, it indicates that this request is not
24 /// supported. A puppet may or may not retry sending SSH keys requests at a
25 /// later point in time when the supervisor ever returned `None`. If a
26 /// supervisor is able to provide SSH authorized keys generally, but
27 /// currently has none configured, it should instead return `Some(vec![])`.
28 async fn ssh_keys(&self, host_id: Uuid, job_id: Uuid) -> Option<Vec<String>>;
29
30 /// Puppet network configuration request.
31 ///
32 /// Hosts may obtain their network configuration through various means, such
33 /// as using DHCP, IPv6 RA, or through static configuration. However, this
34 /// request endpoint can be used by puppets that do not have another means
35 /// to obtain their network configuration, and where a network connection is
36 /// not required to connect to the puppet control socket (such as with
37 /// UnixSeqpacket for Linux containers or VirtIO channels in a QEMU VM).
38 ///
39 /// If this endpoint is supported by the supervisor, it must at least supply
40 /// the host's hostname, and may optionally provide IPv4 and IPv6 addresses,
41 /// gateways, and a DNS server.
42 async fn network_config(
43 &self,
44 host_id: Uuid,
45 job_id: Uuid,
46 ) -> Option<supervisor_puppet::NetworkConfig>;
47
48 /// Puppet job parameters request.
49 ///
50 /// If the supervisor deems that this job is currently active, it should
51 /// respond with the full set of parameters supplied by the coordinator.
52 ///
53 /// Returning `None` implies that this job id is currently not active. If a
54 /// job has no parameters defined, the `parameters` field should be an empty
55 /// `HashMap`.
56 async fn parameters(
57 &self,
58 host_id: Uuid,
59 job_id: Uuid,
60 ) -> Option<HashMap<String, supervisor_puppet::ParameterValue>>;
61
62 /// Generic request handler.
63 ///
64 /// The default implementation of this method calls out to the other methods
65 /// of this trait and should normally not need to be overriden. This method
66 /// is to be used by control socket server implementations.
67 async fn handle_request(
68 &self,
69 _request_id: u64,
70 req: PuppetReq,
71 host_id: Uuid,
72 job_id: Uuid,
73 ) -> SupervisorResp {
74 match req {
75 PuppetReq::Ping => SupervisorResp::PingResp,
76
77 PuppetReq::JobInfo => SupervisorResp::JobInfo(JobInfo { job_id, host_id }),
78
79 PuppetReq::SSHKeys => SupervisorResp::SSHKeysResp {
80 ssh_keys: self
81 .ssh_keys(host_id, job_id)
82 .await
83 .unwrap_or_else(std::vec::Vec::new),
84 },
85
86 PuppetReq::Parameters => self
87 .parameters(host_id, job_id)
88 .await
89 .map_or(SupervisorResp::JobNotFound, |parameters| {
90 SupervisorResp::Parameters { parameters }
91 }),
92
93 PuppetReq::NetworkConfig => self
94 .network_config(host_id, job_id)
95 .await
96 .map_or(SupervisorResp::JobNotFound, SupervisorResp::NetworkConfig),
97 // Would be required for consumers of this type outside of this
98 // crate, as it's marked with `#[non_exhaustive]`. However here we
99 // can exhaustively list all request types and force compile-errors
100 // if we add some in the future.
101 //
102 // _ => SupervisorResp::UnsupportedRequest,
103 }
104 }
105
106 async fn handle_event(
107 &self,
108 puppet_event_id: u64,
109 event: PuppetEvent,
110 host_id: Uuid,
111 job_id: Uuid,
112 ) {
113 match event {
114 PuppetEvent::Ready => self.puppet_ready(puppet_event_id, host_id, job_id).await,
115
116 PuppetEvent::Shutdown {
117 supervisor_event_id,
118 } => {
119 self.puppet_shutdown(puppet_event_id, supervisor_event_id, host_id, job_id)
120 .await
121 }
122
123 PuppetEvent::Reboot {
124 supervisor_event_id,
125 } => {
126 self.puppet_reboot(puppet_event_id, supervisor_event_id, host_id, job_id)
127 .await
128 }
129
130 PuppetEvent::TerminateJob {
131 supervisor_event_id,
132 } => {
133 self.terminate_job(puppet_event_id, supervisor_event_id, host_id, job_id)
134 .await
135 }
136
137 PuppetEvent::RunCommandError { .. }
138 | PuppetEvent::RunCommandOutput { .. }
139 | PuppetEvent::RunCommandExitCode { .. } => {
140 event!(
141 Level::WARN,
142 ?job_id,
143 puppet_event_id,
144 "Received unhandled puppet event: {:?}",
145 event,
146 )
147 } // Would be required for consumers of this type outside of this
148 // crate, as it's marked with `#[non_exhaustive]`. However here we
149 // can exhaustively list all event types and force compile-errors
150 // if we add some in the future.
151 //
152 // _ => warn!(Unhandled puppet event!),
153 }
154 }
155
156 async fn puppet_ready(&self, puppet_event_id: u64, host_id: Uuid, job_id: Uuid);
157 async fn puppet_shutdown(
158 &self,
159 puppet_event_id: u64,
160 supervisor_event_id: Option<u64>,
161 host_id: Uuid,
162 job_id: Uuid,
163 );
164 async fn puppet_reboot(
165 &self,
166 puppet_event_id: u64,
167 supervisor_event_id: Option<u64>,
168 host_id: Uuid,
169 job_id: Uuid,
170 );
171
172 /// Puppet requests job to be terminated.
173 async fn terminate_job(
174 &self,
175 puppet_event_id: u64,
176 supervisor_event_id: Option<u64>,
177 host_id: Uuid,
178 job_id: Uuid,
179 );
180}