1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
use crate::port_sequence_generator::CLOUD_EVENT_TYPE;
use anyhow::{Context, Result};
use cerk::kernel::{BrokerEvent, OutgoingCloudEventProcessed, ProcessingResult};
use cerk::runtime::channel::{BoxedReceiver, BoxedSender};
use cerk::runtime::{InternalServerFn, InternalServerFnRefStatic, InternalServerId};
use chrono::Utc;
use cloudevents::{AttributesReader, Event};
use std::env;

struct ValidationData {
    missing_events: Vec<u32>,
}

fn configure(_data: Option<ValidationData>) -> Result<ValidationData> {
    let amount: u32 = env::var("VALIDATOR_AMOUNT")
        .context("was not able to get env VALIDATOR_AMOUNT")?
        .parse()
        .context("VALIDATOR_AMOUNT was not parsable")?;

    Ok(ValidationData {
        missing_events: (1..=amount).collect(),
    })
}

fn process_event(config: &mut Option<ValidationData>, event: &Event) -> Result<()> {
    let missing_events = &mut config
        .as_mut()
        .ok_or(anyhow!("no config -> can't process event"))?
        .missing_events;
    if event.ty().eq(CLOUD_EVENT_TYPE) {
        let stop = Utc::now();
        let content = event.data().ok_or(anyhow!("was not able to read data"))?;
        if let cloudevents::Data::String(content) = content {
            let number: u32 = content
                .to_string()
                .parse()
                .context("failed to parse data")?;
            if let Some(idx) = missing_events.iter().position(|v| v.eq(&number)) {
                missing_events.remove(idx);
                debug!("event {} was received", number)
            } else {
                debug!(
                    "event {} was received, but not expected, duplicate?",
                    number
                )
            }
            debug!(
                "event {} delay from generation to validator {}ms",
                number,
                (stop - *event.time().ok_or(anyhow!("failed to read event time"))?)
                    .num_milliseconds()
            )
        }
    } else {
        warn!(
            "got CloudEvent of type {}, but only type {} is getting processed",
            event.ty(),
            CLOUD_EVENT_TYPE
        )
    }

    let missing = missing_events.len();
    info!("missing events: {}", missing);
    if missing == 0 {
        info!("**************************************************************************");
        info!("************************** received all events! **************************");
        info!("**************************************************************************");
    }

    Ok(())
}

/// This port validates a sequence of CloudEvent for completion, generated by the `port_sequence_generator`.
/// This port is for testing!
///
/// # Env Options
///
/// * `VALIDATOR_AMOUNT` define the total amount of events that should be generated
///
pub fn port_sequence_validator_start(
    id: InternalServerId,
    inbox: BoxedReceiver,
    sender_to_kernel: BoxedSender,
) {
    let mut data: Option<ValidationData> = None;
    info!("start sequence validator port with id {}", id);
    loop {
        match inbox.receive() {
            BrokerEvent::Init => (),
            BrokerEvent::ConfigUpdated(_, _) => {
                info!("{} received ConfigUpdated", &id);
                match configure(data) {
                    Ok(new_data) => {
                        data = Some(new_data);
                    }
                    Err(e) => {
                        data = None;
                        error!("{} failed to load config {:?}", &id, e)
                    }
                }
            }
            BrokerEvent::OutgoingCloudEvent(event) => {
                let result = match process_event(&mut data, &event.cloud_event) {
                    Err(e) => {
                        error!("{} failed to process event: {:?}", id, e);
                        ProcessingResult::PermanentError
                    }
                    Ok(_) => ProcessingResult::Successful,
                };
                if event.args.delivery_guarantee.requires_acknowledgment() {
                    sender_to_kernel.send(BrokerEvent::OutgoingCloudEventProcessed(
                        OutgoingCloudEventProcessed {
                            routing_id: event.routing_id,
                            result,
                            sender_id: id.clone(),
                        },
                    ))
                }
            }
            broker_event => warn!("event {} not implemented", broker_event),
        }
    }
}

/// This is the pointer for the main function to start the port.
pub static PORT_SEQUENCE_VALIDATOR: InternalServerFnRefStatic =
    &(port_sequence_validator_start as InternalServerFn);

#[cfg(test)]
mod test {
    use super::*;
    use crate::port_sequence_generator::generate_sequence_event;
    use env_logger::Env;

    #[cfg(test)]
    #[ctor::ctor]
    fn init() {
        env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init();
    }

    #[test]
    fn check_single_event() -> Result<()> {
        env::set_var("VALIDATOR_AMOUNT", "1");
        let mut data = Some(configure(None)?);
        assert_eq!(data.as_ref().unwrap().missing_events.len(), 1);

        process_event(&mut data, &generate_sequence_event(1))?;
        assert_eq!(data.as_ref().unwrap().missing_events.len(), 0);
        Ok(())
    }
}