treadmill_rs/connector.rs
1pub use crate::api::switchboard_supervisor::JobInitializingStage;
2pub use crate::api::switchboard_supervisor::RunningJobState;
3pub use crate::api::switchboard_supervisor::StartJobMessage;
4pub use crate::api::switchboard_supervisor::StopJobMessage;
5use crate::api::switchboard_supervisor::{SupervisorEvent, SupervisorJobEvent};
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12#[non_exhaustive]
13pub enum JobErrorKind {
14 /// The requested job is already running and thus cannot be started again.
15 AlreadyRunning,
16
17 /// The requested job is already in the process of being shut down.
18 AlreadyStopping,
19
20 /// A job with this ID was previously running on this supervisor,
21 /// but we weren't asked to `resume` it.
22 JobAlreadyExists,
23
24 /// Cannot resume this job, either because this functionality is
25 /// unsupported or because this particular job cannot be resumed.
26 CannotResume,
27
28 /// Job with the specified ID cannot be found.
29 JobNotFound,
30
31 /// The maximum number of concurrent jobs has been reached.
32 MaxConcurrentJobs,
33
34 /// The requested image cannot be found (either its manifest or a
35 /// resource stated therein cannot be fetched):
36 ImageNotFound,
37
38 /// There is some problem with the image.
39 ImageInvalid,
40
41 /// The image is not compatible with, or does not meet the
42 /// expectations of this supervisor.
43 ImageNotCompatible,
44
45 /// Internal error within the supervisor:
46 InternalError,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct JobError {
51 pub error_kind: JobErrorKind,
52 pub description: String,
53}
54
55/// Supervisor interface for coordinator connectors.
56///
57/// A supervisor interacts with a coordinator through a
58/// [_connector_](SupervisorConnector). These connectors expect to be passed an
59/// instance of [`Supervisor`] to deliver requests and events.
60#[async_trait]
61pub trait Supervisor: std::fmt::Debug + Send + Sync + 'static {
62 /// Start a new job, based on the parameters supplied in the
63 /// `StartJobRequest`.
64 ///
65 /// This method should avoid blocking on long-running operations that should
66 /// be able to be interrupted by other requests (such as stopping a job
67 /// during an image download).
68 ///
69 /// A successful return (`Ok(())`) from this method does not imply that the
70 /// job was started successfully, but merely that there is not an error to
71 /// return at this point. Even after returning from this method, errors can
72 /// be reported through [`SupervisorConnector::report_job_error`].
73 ///
74 /// Implementations should use [`SupervisorConnector::update_job_state`] to
75 /// report on progress while starting or stopping a job, or performing
76 /// similar actions.
77 async fn start_job(this: &Arc<Self>, request: StartJobMessage) -> Result<(), JobError>;
78
79 /// Stop a running job.
80 ///
81 /// A successful return (`Ok(())`) from this method does not imply that the
82 /// job was stopped successfully, but merely that there is not an error to
83 /// return at this point. Even after returning from this method, errors can
84 /// be reported through [`SupervisorConnector::report_job_error`].
85 ///
86 /// Implementations should use [`SupervisorConnector::update_job_state`] to
87 /// report on progress while starting or stopping a job, or performing
88 /// similar actions.
89 async fn stop_job(this: &Arc<Self>, request: StopJobMessage) -> Result<(), JobError>;
90}
91
92/// Connector to a coordinator.
93///
94/// This interface is implemented by all "connectors" that facilitate
95/// interactions between supervisors and coordinators. It allows supervisors
96/// (implementing the [`Supervisor`] trait) to deliver events and issue requests
97/// to a coordinator, for instance to report their current status.
98#[async_trait]
99pub trait SupervisorConnector: std::fmt::Debug + Send + Sync + 'static {
100 /// Start the connector's main loop.
101 ///
102 /// Supervisors are expected to execute this method after performing their
103 /// startup initialization. A connector will return with `Ok(())` when it
104 /// intends the supervisor to shut down, and with `Err(())` in case an error
105 /// occurred communicating with the switchboard. In the latter case,
106 /// supervisors may or may not try to reconnect by calling `run()` in the
107 /// loop.
108 async fn run(&self) -> Result<(), ()>;
109
110 async fn update_event(&self, supervisor_event: SupervisorEvent);
111
112 async fn update_job_state(
113 &self,
114 job_id: Uuid,
115 job_state: RunningJobState,
116 status_message: Option<String>,
117 ) {
118 self.update_event(SupervisorEvent::JobEvent {
119 job_id,
120 event: SupervisorJobEvent::StateTransition {
121 new_state: job_state,
122 status_message,
123 },
124 })
125 .await
126 }
127 async fn report_job_error(&self, job_id: Uuid, error: JobError) {
128 self.update_event(SupervisorEvent::JobEvent {
129 job_id,
130 event: SupervisorJobEvent::Error { error },
131 })
132 .await
133 }
134 // TODO: we'll likely want to remove this method from here, and instead have
135 // supervisors directly interact with log servers to push events. Or have
136 // connectors perform these interactions for them...
137 async fn send_job_console_log(&self, job_id: Uuid, console_bytes: Vec<u8>) {
138 self.update_event(SupervisorEvent::JobEvent {
139 job_id,
140 event: SupervisorJobEvent::ConsoleLog { console_bytes },
141 })
142 .await
143 }
144}