1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
use super::Config;
use crate::kernel::outgoing_processing_result::ProcessingResult;
use crate::kernel::CloudEventRoutingArgs;
use crate::runtime::channel::BoxedSender;
use crate::runtime::{InternalServerFnRef, InternalServerId};
use cloudevents::Event;
use serde::Serialize;
use std::fmt;

/// the unique identifier of the CloudEvent routing attempt
/// this id is generated on a receiver per CloudEvent and routing attempt.
///
/// If a CloudEvent has to be routed multiple times (e.g., delivery guarantee at least once) and the first routing fails (e.g., connection to outgoing queue was broken), then the message routing has to be retried.
/// For this retry a new  `CloudEventMessageRoutingId` has to be granted.
///
/// This id should not be tried to be interpreted or to be pared.
/// The generation is not defined globally and can be done differently by every port implementation.
pub type CloudEventMessageRoutingId = String;

/// Representation of all events which are exchanged between the components
pub enum BrokerEvent {
    /// The ScheduleInternalServer event tells the Scheduler to schedule a new internal server.
    /// One event is produced by the Kernel for each component.
    ///
    /// # Arguments
    ///
    /// * `InternalServerId` - id of the component that should be scheduled
    /// * `InternalServerFn` - start function of the component that should be scheduled
    ///
    ScheduleInternalServer(ScheduleInternalServerStatic),

    /// The InternalServerScheduled event indicates to the receiver that a new internal server was successfully scheduled.
    /// The event gets produced by the scheduler after a component was scheduled (because of a ScheduleInternalServer event).
    /// The receiver is the Kernel.
    ///
    /// # Arguments
    ///
    /// * `InternalServerId` - id of the component that was scheduled
    /// * `BoxedSender` - channel inbox of the scheduled component
    ///
    InternalServerScheduled(InternalServerId, BoxedSender),

    /// The Init event indicates to the receiver that it should start interacting with the outside world.
    /// The event is produced by the Kernel when all components are scheduled.
    Init,

    /// The ConfigUpdated event indicates to the receiver that the config has changed and a configuration update should be applied.
    /// The event is produced by the router and send to the Kernel and then to the component.
    ///
    /// # Arguments
    ///
    /// * `Config` - the new configurations
    /// * `InternalServerId` - the component id for which the configurations are meant
    ///
    ConfigUpdated(Config, InternalServerId),

    /// The IncomingCloudEvent event indicates to the receiver that a new CloudEvent has been received from the outside world.
    /// The event is produced by an input port and is sent to the Kernel. The Kernel sends the same event to the router.
    IncomingCloudEvent(IncomingCloudEvent),

    /// The `RoutingResult` is the result of a routing from one `IncomingCloudEvent`.
    /// The event is sent from the router to the kernel and there forwarded as `OutgoingCloudEvent` to the ports.
    ///
    /// The `RoutingResult` doesn't have to be sent if there are no destinations (`Vec<OutgoingCloudEvent>.len() == 0`)
    RoutingResult(RoutingResult),

    /// The OutgoingCloudEvent event indicates to the receiver that a CloudEvent has been routed and is ready to be forwarded to the outside world.
    /// The event is created by the router, send to the Kernel (in a badge as `RoutingResult`) and then to the output port(s).
    /// One event for every output port which should forward the data is created.
    OutgoingCloudEvent(OutgoingCloudEvent),

    /// The OutgoingCloudEvent was processed.
    /// The OutgoingCloudEventProcessed notifies the kernel about the end of the processing and indicates whether the outcome was successful.
    /// This response is only used if the `CloudEventRoutingArgs` in the `OutgoingCloudEvent` event indicates  that a response is used  (`CloudEventRoutingArgs.delivery_guarantee.requires_acknowledgment()`).
    OutgoingCloudEventProcessed(OutgoingCloudEventProcessed),

    /// The IncomingCloudEvent was processed.
    /// The IncomingCloudEventProcessed notifies the receiver port that the routing is completed and a response to the sender can be sent.
    /// This response is only used if the `CloudEventRoutingArgs` in the `IncomingCloudEvent` event indicates that a response is used (`CloudEventRoutingArgs.delivery_guarantee.requires_acknowledgment()`).
    ///
    /// # Arguments
    /// * `CloudEventMessageRoutingId` - the unique identifier of the CloudEvent routing attempt
    /// * `OutgoingProcessingResult` - result of the processing, was the processing successful? Error?
    ///
    IncomingCloudEventProcessed(CloudEventMessageRoutingId, ProcessingResult),

    /// The Batch event can be used to make sure a collection of events are processed by the MicroKernel in one batch to prevent race conditions.
    ///
    /// # Arguments
    ///
    /// * `Vec<BrokerEvent>` - a vector of `BrokerEvent`
    ///
    Batch(Vec<BrokerEvent>),

    /// A health check port sends `HealthCheckRequest` to some components, they should response with `HealthCheckResponse`
    HealthCheckRequest(HealthCheckRequest),

    /// response for `HealthCheckRequest`, should go to a health check component
    HealthCheckResponse(HealthCheckResponse),
}

impl fmt::Display for BrokerEvent {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            BrokerEvent::Init => write!(f, "Init"),
            BrokerEvent::ScheduleInternalServer(event) => {
                write!(f, "ScheduleInternalServer server_id={}", event.id)
            }
            BrokerEvent::InternalServerScheduled(id, _) => {
                write!(f, "InternalServerScheduled server_id={}", id)
            }
            BrokerEvent::IncomingCloudEvent(event) => {
                write!(f, "IncomingCloudEvent receiver_id={}", event.incoming_id)
            }
            BrokerEvent::RoutingResult(event) => {
                write!(f, "RoutingResult receiver_id={}", event.incoming_id)
            }
            BrokerEvent::ConfigUpdated(_, id) => write!(f, "ConfigUpdated destination_id={}", id),
            BrokerEvent::OutgoingCloudEvent(event) => write!(
                f,
                "OutgoingCloudEvent destination_id={}",
                event.destination_id
            ),
            BrokerEvent::OutgoingCloudEventProcessed(event) => {
                write!(f, "OutgoingCloudEventProcessed result={}", event.result)
            }
            BrokerEvent::IncomingCloudEventProcessed(_, state) => {
                write!(f, "IncomingCloudEventProcessed state={}", state)
            }
            BrokerEvent::Batch(_) => write!(f, "Batch"),
            BrokerEvent::HealthCheckRequest(_) => write!(f, "HealthCheckRequest"),
            BrokerEvent::HealthCheckResponse(_) => write!(f, "HealthCheckResponse"),
        }
    }
}

/// Struct for `BrokerEvent::IncomingCloudEvent`
#[derive(Clone, Debug, PartialEq)]
pub struct IncomingCloudEvent {
    /// id of the component which received the CloudEvent
    pub incoming_id: InternalServerId,
    /// the unique identifier of the CloudEvent routing attempt
    pub routing_id: CloudEventMessageRoutingId,
    /// the deserialized CloudEvent that the component has received
    pub cloud_event: Event,
    /// routing arguments to define how a CloudEvent should be routed
    pub args: CloudEventRoutingArgs,
}

/// Struct for `BrokerEvent::RoutingResult`
#[derive(Clone, Debug, PartialEq)]
pub struct RoutingResult {
    /// the id of the component that received the event from the outside world
    pub incoming_id: InternalServerId,
    /// the unique identifier of the CloudEvent routing attempt
    pub routing_id: CloudEventMessageRoutingId,
    /// The list of events that should be forwarded to the outgoing ports.
    /// Multiple routing to the same destination_id with a `delivery_guarantee.requires_acknowledgment()` are currently not supported by the kernel.
    pub routing: Vec<OutgoingCloudEvent>,
    /// routing arguments to define how a CloudEvent should be routed - this config is used by the kernel; the args for the ports are inside the `Vec<OutgoingCloudEvent>`
    pub args: CloudEventRoutingArgs,
    /// outcome of the routing, was it successful?
    pub result: ProcessingResult,
}

/// Struct for `BrokerEvent::OutgoingCloudEvent`
#[derive(Clone, Debug, PartialEq)]
pub struct OutgoingCloudEvent {
    /// the unique identifier of the CloudEvent routing attempt
    pub routing_id: CloudEventMessageRoutingId,
    /// the CloudEvent which should be forwarded
    pub cloud_event: Event,
    /// the id of the component that should send the event
    pub destination_id: InternalServerId,
    /// routing arguments to define how a CloudEvent should be routed
    pub args: CloudEventRoutingArgs,
}

/// Struct for `BrokerEvent::OutgoingCloudEventProcessed`
#[derive(Clone, Debug, PartialEq)]
pub struct OutgoingCloudEventProcessed {
    /// the id of the component that processed the event (mostly sent to a queue)
    pub sender_id: InternalServerId,
    /// the unique identifier of the CloudEvent routing attempt
    pub routing_id: CloudEventMessageRoutingId,
    /// result of the processing, was the processing successful? Error?
    pub result: ProcessingResult,
}

/// Struct for `BrokerEvent::ScheduleInternalServer`
#[derive(Clone, Debug, PartialEq)]
pub struct ScheduleInternalServer<'a> {
    /// id of the service that should be scheduled
    pub id: InternalServerId,
    /// pointer to the start function
    pub function: InternalServerFnRef<'a>,
}

/// Struct for `BrokerEvent::HealthCheckRequest`
pub struct HealthCheckRequest {
    /// id of the health check
    pub id: String,
    /// the id of the component that created the request
    pub sender_id: InternalServerId,
    /// the port that should response to the health check
    pub destination_id: InternalServerId,
}

/// Struct for `BrokerEvent::HealthCheckResponse`
pub struct HealthCheckResponse {
    /// id of the health check
    pub id: String,
    /// the id of the component that responded to the health request
    pub sender_id: InternalServerId,
    /// routing destination of the health response (HealthCheckRequest.sender_id)
    pub destination_id: InternalServerId,
    /// status of the component
    pub status: HealthCheckStatus,
}

/// health check status
#[derive(Debug, PartialEq, Clone, Serialize)]
pub enum HealthCheckStatus {
    /// the component is healthy and fully functional
    Healthy,
    /// the component is unhealthy, message to indicate the problem
    Unhealthy(String),
}

/// Fixed static lifetime for struct for `BrokerEvent::ScheduleInternalServer`
pub type ScheduleInternalServerStatic = ScheduleInternalServer<'static>;