1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
use super::Config; use crate::kernel::outgoing_processing_result::ProcessingResult; use crate::kernel::CloudEventRoutingArgs; use crate::runtime::channel::BoxedSender; use crate::runtime::{InternalServerFnRef, InternalServerId}; use cloudevents::Event; use serde::Serialize; use std::fmt; /// the unique identifier of the CloudEvent routing attempt /// this id is generated on a receiver per CloudEvent and routing attempt. /// /// If a CloudEvent has to be routed multiple times (e.g., delivery guarantee at least once) and the first routing fails (e.g., connection to outgoing queue was broken), then the message routing has to be retried. /// For this retry a new `CloudEventMessageRoutingId` has to be granted. /// /// This id should not be tried to be interpreted or to be pared. /// The generation is not defined globally and can be done differently by every port implementation. pub type CloudEventMessageRoutingId = String; /// Representation of all events which are exchanged between the components pub enum BrokerEvent { /// The ScheduleInternalServer event tells the Scheduler to schedule a new internal server. /// One event is produced by the Kernel for each component. /// /// # Arguments /// /// * `InternalServerId` - id of the component that should be scheduled /// * `InternalServerFn` - start function of the component that should be scheduled /// ScheduleInternalServer(ScheduleInternalServerStatic), /// The InternalServerScheduled event indicates to the receiver that a new internal server was successfully scheduled. /// The event gets produced by the scheduler after a component was scheduled (because of a ScheduleInternalServer event). /// The receiver is the Kernel. /// /// # Arguments /// /// * `InternalServerId` - id of the component that was scheduled /// * `BoxedSender` - channel inbox of the scheduled component /// InternalServerScheduled(InternalServerId, BoxedSender), /// The Init event indicates to the receiver that it should start interacting with the outside world. /// The event is produced by the Kernel when all components are scheduled. Init, /// The ConfigUpdated event indicates to the receiver that the config has changed and a configuration update should be applied. /// The event is produced by the router and send to the Kernel and then to the component. /// /// # Arguments /// /// * `Config` - the new configurations /// * `InternalServerId` - the component id for which the configurations are meant /// ConfigUpdated(Config, InternalServerId), /// The IncomingCloudEvent event indicates to the receiver that a new CloudEvent has been received from the outside world. /// The event is produced by an input port and is sent to the Kernel. The Kernel sends the same event to the router. IncomingCloudEvent(IncomingCloudEvent), /// The `RoutingResult` is the result of a routing from one `IncomingCloudEvent`. /// The event is sent from the router to the kernel and there forwarded as `OutgoingCloudEvent` to the ports. /// /// The `RoutingResult` doesn't have to be sent if there are no destinations (`Vec<OutgoingCloudEvent>.len() == 0`) RoutingResult(RoutingResult), /// The OutgoingCloudEvent event indicates to the receiver that a CloudEvent has been routed and is ready to be forwarded to the outside world. /// The event is created by the router, send to the Kernel (in a badge as `RoutingResult`) and then to the output port(s). /// One event for every output port which should forward the data is created. OutgoingCloudEvent(OutgoingCloudEvent), /// The OutgoingCloudEvent was processed. /// The OutgoingCloudEventProcessed notifies the kernel about the end of the processing and indicates whether the outcome was successful. /// This response is only used if the `CloudEventRoutingArgs` in the `OutgoingCloudEvent` event indicates that a response is used (`CloudEventRoutingArgs.delivery_guarantee.requires_acknowledgment()`). OutgoingCloudEventProcessed(OutgoingCloudEventProcessed), /// The IncomingCloudEvent was processed. /// The IncomingCloudEventProcessed notifies the receiver port that the routing is completed and a response to the sender can be sent. /// This response is only used if the `CloudEventRoutingArgs` in the `IncomingCloudEvent` event indicates that a response is used (`CloudEventRoutingArgs.delivery_guarantee.requires_acknowledgment()`). /// /// # Arguments /// * `CloudEventMessageRoutingId` - the unique identifier of the CloudEvent routing attempt /// * `OutgoingProcessingResult` - result of the processing, was the processing successful? Error? /// IncomingCloudEventProcessed(CloudEventMessageRoutingId, ProcessingResult), /// The Batch event can be used to make sure a collection of events are processed by the MicroKernel in one batch to prevent race conditions. /// /// # Arguments /// /// * `Vec<BrokerEvent>` - a vector of `BrokerEvent` /// Batch(Vec<BrokerEvent>), /// A health check port sends `HealthCheckRequest` to some components, they should response with `HealthCheckResponse` HealthCheckRequest(HealthCheckRequest), /// response for `HealthCheckRequest`, should go to a health check component HealthCheckResponse(HealthCheckResponse), } impl fmt::Display for BrokerEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { BrokerEvent::Init => write!(f, "Init"), BrokerEvent::ScheduleInternalServer(event) => { write!(f, "ScheduleInternalServer server_id={}", event.id) } BrokerEvent::InternalServerScheduled(id, _) => { write!(f, "InternalServerScheduled server_id={}", id) } BrokerEvent::IncomingCloudEvent(event) => { write!(f, "IncomingCloudEvent receiver_id={}", event.incoming_id) } BrokerEvent::RoutingResult(event) => { write!(f, "RoutingResult receiver_id={}", event.incoming_id) } BrokerEvent::ConfigUpdated(_, id) => write!(f, "ConfigUpdated destination_id={}", id), BrokerEvent::OutgoingCloudEvent(event) => write!( f, "OutgoingCloudEvent destination_id={}", event.destination_id ), BrokerEvent::OutgoingCloudEventProcessed(event) => { write!(f, "OutgoingCloudEventProcessed result={}", event.result) } BrokerEvent::IncomingCloudEventProcessed(_, state) => { write!(f, "IncomingCloudEventProcessed state={}", state) } BrokerEvent::Batch(_) => write!(f, "Batch"), BrokerEvent::HealthCheckRequest(_) => write!(f, "HealthCheckRequest"), BrokerEvent::HealthCheckResponse(_) => write!(f, "HealthCheckResponse"), } } } /// Struct for `BrokerEvent::IncomingCloudEvent` #[derive(Clone, Debug, PartialEq)] pub struct IncomingCloudEvent { /// id of the component which received the CloudEvent pub incoming_id: InternalServerId, /// the unique identifier of the CloudEvent routing attempt pub routing_id: CloudEventMessageRoutingId, /// the deserialized CloudEvent that the component has received pub cloud_event: Event, /// routing arguments to define how a CloudEvent should be routed pub args: CloudEventRoutingArgs, } /// Struct for `BrokerEvent::RoutingResult` #[derive(Clone, Debug, PartialEq)] pub struct RoutingResult { /// the id of the component that received the event from the outside world pub incoming_id: InternalServerId, /// the unique identifier of the CloudEvent routing attempt pub routing_id: CloudEventMessageRoutingId, /// The list of events that should be forwarded to the outgoing ports. /// Multiple routing to the same destination_id with a `delivery_guarantee.requires_acknowledgment()` are currently not supported by the kernel. pub routing: Vec<OutgoingCloudEvent>, /// routing arguments to define how a CloudEvent should be routed - this config is used by the kernel; the args for the ports are inside the `Vec<OutgoingCloudEvent>` pub args: CloudEventRoutingArgs, /// outcome of the routing, was it successful? pub result: ProcessingResult, } /// Struct for `BrokerEvent::OutgoingCloudEvent` #[derive(Clone, Debug, PartialEq)] pub struct OutgoingCloudEvent { /// the unique identifier of the CloudEvent routing attempt pub routing_id: CloudEventMessageRoutingId, /// the CloudEvent which should be forwarded pub cloud_event: Event, /// the id of the component that should send the event pub destination_id: InternalServerId, /// routing arguments to define how a CloudEvent should be routed pub args: CloudEventRoutingArgs, } /// Struct for `BrokerEvent::OutgoingCloudEventProcessed` #[derive(Clone, Debug, PartialEq)] pub struct OutgoingCloudEventProcessed { /// the id of the component that processed the event (mostly sent to a queue) pub sender_id: InternalServerId, /// the unique identifier of the CloudEvent routing attempt pub routing_id: CloudEventMessageRoutingId, /// result of the processing, was the processing successful? Error? pub result: ProcessingResult, } /// Struct for `BrokerEvent::ScheduleInternalServer` #[derive(Clone, Debug, PartialEq)] pub struct ScheduleInternalServer<'a> { /// id of the service that should be scheduled pub id: InternalServerId, /// pointer to the start function pub function: InternalServerFnRef<'a>, } /// Struct for `BrokerEvent::HealthCheckRequest` pub struct HealthCheckRequest { /// id of the health check pub id: String, /// the id of the component that created the request pub sender_id: InternalServerId, /// the port that should response to the health check pub destination_id: InternalServerId, } /// Struct for `BrokerEvent::HealthCheckResponse` pub struct HealthCheckResponse { /// id of the health check pub id: String, /// the id of the component that responded to the health request pub sender_id: InternalServerId, /// routing destination of the health response (HealthCheckRequest.sender_id) pub destination_id: InternalServerId, /// status of the component pub status: HealthCheckStatus, } /// health check status #[derive(Debug, PartialEq, Clone, Serialize)] pub enum HealthCheckStatus { /// the component is healthy and fully functional Healthy, /// the component is unhealthy, message to indicate the problem Unhealthy(String), } /// Fixed static lifetime for struct for `BrokerEvent::ScheduleInternalServer` pub type ScheduleInternalServerStatic = ScheduleInternalServer<'static>;