maelstrom_base/
proto.rs

1//! Messages sent between various binaries.
2
3use crate::{
4    stats::BrokerStatistics, ArtifactUploadLocation, ClientJobId, JobBrokerStatus, JobId,
5    JobOutcomeResult, JobSpec, JobWorkerStatus, Sha256Digest,
6};
7use bincode::Options;
8use serde::{Deserialize, Serialize};
9
10/// The first message sent by a connector to the broker. It identifies what the connector is, and
11/// provides any relevant information.
12#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
13pub enum Hello {
14    Client,
15    Worker { slots: u32 },
16    Monitor,
17    ArtifactPusher,
18    ArtifactFetcher,
19}
20
21/// Message sent from the broker to a worker. The broker won't send a message until it has received
22/// a [`Hello`] and determined the type of its interlocutor.
23#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
24#[allow(clippy::large_enum_variant)]
25pub enum BrokerToWorker {
26    EnqueueJob(JobId, JobSpec),
27    CancelJob(JobId),
28}
29
30/// Message sent from a worker to the broker. These are responses to previous
31/// [`BrokerToWorker::EnqueueJob`] messages. After sending the initial [`Hello`], a worker will
32/// send a stream of these messages.
33#[derive(Clone, Debug, Deserialize, PartialEq, Eq, PartialOrd, Ord, Serialize)]
34pub enum WorkerToBroker {
35    JobResponse(JobId, JobOutcomeResult),
36    JobStatusUpdate(JobId, JobWorkerStatus),
37}
38
39/// Message sent from the broker to a client. The broker won't send a message until it has received
40/// a [`Hello`] and determined the type of its interlocutor.
41#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
42pub enum BrokerToClient {
43    JobResponse(ClientJobId, JobOutcomeResult),
44    JobStatusUpdate(ClientJobId, JobBrokerStatus),
45    TransferArtifact(Sha256Digest),
46    GeneralError(String),
47}
48
49/// Message sent from a client to the broker. After sending the initial [`Hello`], a client will
50/// send a stream of these messages.
51#[allow(clippy::large_enum_variant)]
52#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
53pub enum ClientToBroker {
54    JobRequest(ClientJobId, JobSpec),
55    ArtifactTransferred(Sha256Digest, ArtifactUploadLocation),
56}
57
58/// Message sent from the broker to a monitor. The broker won't send a message until it has
59/// recevied a [`Hello`] and determined the type of its interlocutor.
60#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
61pub enum BrokerToMonitor {
62    /// Response to a [`MonitorToBroker::StatisticsRequest`].
63    StatisticsResponse(BrokerStatistics),
64}
65
66/// Message sent from a monitor to the broker. After sending the initial [`Hello`], a monitor will
67/// send a stream of these messages.
68#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
69pub enum MonitorToBroker {
70    /// Request a [`BrokerToMonitor::StatisticsResponse`] with the current statistics.
71    StatisticsRequest,
72
73    /// Tell the broker to stop immediately. It will just shut down, closing all outstanding
74    /// connections.
75    StopRequest,
76}
77
78/// Message sent from the broker to an artifact fetcher. This will be in response to an
79/// [`ArtifactFetcherToBroker`] message. On failure to get the artifact, the result contains
80/// details about what went wrong. After a failure, the broker will close the artifact fetcher
81/// connection.
82#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
83pub struct BrokerToArtifactFetcher(pub Result<u64, String>);
84
85/// Message sent from an artifact fetcher to the broker. It will be answered with a
86/// [`BrokerToArtifactFetcher`].
87#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
88pub struct ArtifactFetcherToBroker(pub Sha256Digest);
89
90/// Message sent from the broker to an artifact pusher. This will be in response to an
91/// [`ArtifactPusherToBroker`] message and the artifact's body. On success, the message contains no
92/// other details, indicating that the artifact was successfully written to disk, and that the
93/// digest matched. On failure, this message contains details about what went wrong. After a
94/// failure, the broker will close the artifact pusher connection.
95#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
96pub struct BrokerToArtifactPusher(pub Result<(), String>);
97
98/// Message sent from an artifact pusher to the broker. It contains the digest and size of the
99/// artifact. The body of the artifact will immediately follow this message. It will be answered
100/// with a [`BrokerToArtifactPusher`].
101#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
102pub struct ArtifactPusherToBroker(pub Sha256Digest, pub u64);
103
104fn bincode() -> impl Options {
105    bincode::options().with_big_endian()
106}
107
108pub fn serialize<T: ?Sized + Serialize>(value: &T) -> bincode::Result<Vec<u8>> {
109    bincode().serialize(value)
110}
111
112pub fn serialize_into<W: std::io::Write, T: ?Sized + Serialize>(
113    writer: W,
114    value: &T,
115) -> bincode::Result<()> {
116    bincode().serialize_into(writer, value)
117}
118
119pub fn serialized_size<T: ?Sized + Serialize>(value: &T) -> bincode::Result<u64> {
120    bincode().serialized_size(value)
121}
122
123pub fn deserialize<'a, T: Deserialize<'a>>(bytes: &'a [u8]) -> bincode::Result<T> {
124    bincode().deserialize(bytes)
125}