treadmill_rs/
connector.rs

1pub use crate::api::switchboard_supervisor::JobInitializingStage;
2pub use crate::api::switchboard_supervisor::RunningJobState;
3pub use crate::api::switchboard_supervisor::StartJobMessage;
4pub use crate::api::switchboard_supervisor::StopJobMessage;
5use crate::api::switchboard_supervisor::{SupervisorEvent, SupervisorJobEvent};
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12#[non_exhaustive]
13pub enum JobErrorKind {
14    /// The requested job is already running and thus cannot be started again.
15    AlreadyRunning,
16
17    /// The requested job is already in the process of being shut down.
18    AlreadyStopping,
19
20    /// A job with this ID was previously running on this supervisor,
21    /// but we weren't asked to `resume` it.
22    JobAlreadyExists,
23
24    /// Cannot resume this job, either because this functionality is
25    /// unsupported or because this particular job cannot be resumed.
26    CannotResume,
27
28    /// Job with the specified ID cannot be found.
29    JobNotFound,
30
31    /// The maximum number of concurrent jobs has been reached.
32    MaxConcurrentJobs,
33
34    /// The requested image cannot be found (either its manifest or a
35    /// resource stated therein cannot be fetched):
36    ImageNotFound,
37
38    /// There is some problem with the image.
39    ImageInvalid,
40
41    /// The image is not compatible with, or does not meet the
42    /// expectations of this supervisor.
43    ImageNotCompatible,
44
45    /// Internal error within the supervisor:
46    InternalError,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct JobError {
51    pub error_kind: JobErrorKind,
52    pub description: String,
53}
54
55/// Supervisor interface for coordinator connectors.
56///
57/// A supervisor interacts with a coordinator through a
58/// [_connector_](SupervisorConnector). These connectors expect to be passed an
59/// instance of [`Supervisor`] to deliver requests and events.
60#[async_trait]
61pub trait Supervisor: std::fmt::Debug + Send + Sync + 'static {
62    /// Start a new job, based on the parameters supplied in the
63    /// `StartJobRequest`.
64    ///
65    /// This method should avoid blocking on long-running operations that should
66    /// be able to be interrupted by other requests (such as stopping a job
67    /// during an image download).
68    ///
69    /// A successful return (`Ok(())`) from this method does not imply that the
70    /// job was started successfully, but merely that there is not an error to
71    /// return at this point. Even after returning from this method, errors can
72    /// be reported through [`SupervisorConnector::report_job_error`].
73    ///
74    /// Implementations should use [`SupervisorConnector::update_job_state`] to
75    /// report on progress while starting or stopping a job, or performing
76    /// similar actions.
77    async fn start_job(this: &Arc<Self>, request: StartJobMessage) -> Result<(), JobError>;
78
79    /// Stop a running job.
80    ///
81    /// A successful return (`Ok(())`) from this method does not imply that the
82    /// job was stopped successfully, but merely that there is not an error to
83    /// return at this point. Even after returning from this method, errors can
84    /// be reported through [`SupervisorConnector::report_job_error`].
85    ///
86    /// Implementations should use [`SupervisorConnector::update_job_state`] to
87    /// report on progress while starting or stopping a job, or performing
88    /// similar actions.
89    async fn stop_job(this: &Arc<Self>, request: StopJobMessage) -> Result<(), JobError>;
90}
91
92/// Connector to a coordinator.
93///
94/// This interface is implemented by all "connectors" that facilitate
95/// interactions between supervisors and coordinators. It allows supervisors
96/// (implementing the [`Supervisor`] trait) to deliver events and issue requests
97/// to a coordinator, for instance to report their current status.
98#[async_trait]
99pub trait SupervisorConnector: std::fmt::Debug + Send + Sync + 'static {
100    /// Start the connector's main loop.
101    ///
102    /// Supervisors are expected to execute this method after performing their
103    /// startup initialization. A connector will return with `Ok(())` when it
104    /// intends the supervisor to shut down, and with `Err(())` in case an error
105    /// occurred communicating with the switchboard. In the latter case,
106    /// supervisors may or may not try to reconnect by calling `run()` in the
107    /// loop.
108    async fn run(&self) -> Result<(), ()>;
109
110    async fn update_event(&self, supervisor_event: SupervisorEvent);
111
112    async fn update_job_state(
113        &self,
114        job_id: Uuid,
115        job_state: RunningJobState,
116        status_message: Option<String>,
117    ) {
118        self.update_event(SupervisorEvent::JobEvent {
119            job_id,
120            event: SupervisorJobEvent::StateTransition {
121                new_state: job_state,
122                status_message,
123            },
124        })
125        .await
126    }
127    async fn report_job_error(&self, job_id: Uuid, error: JobError) {
128        self.update_event(SupervisorEvent::JobEvent {
129            job_id,
130            event: SupervisorJobEvent::Error { error },
131        })
132        .await
133    }
134    // TODO: we'll likely want to remove this method from here, and instead have
135    // supervisors directly interact with log servers to push events. Or have
136    // connectors perform these interactions for them...
137    async fn send_job_console_log(&self, job_id: Uuid, console_bytes: Vec<u8>) {
138        self.update_event(SupervisorEvent::JobEvent {
139            job_id,
140            event: SupervisorJobEvent::ConsoleLog { console_bytes },
141        })
142        .await
143    }
144}