1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
use std::io;
pub mod listener;
pub mod message;
pub mod publisher;
mod utils;
use crate::listener::EventBusListener;
use crate::publisher::EventBusPublisher;
use std::net::{TcpStream, ToSocketAddrs};

pub fn eventbus<A: ToSocketAddrs>(address: A) -> io::Result<(EventBusPublisher, EventBusListener)> {
    let socket = TcpStream::connect(&address)?;
    socket.set_nonblocking(true)?;
    let control_socket = socket // used to send control messages (ping/pong ; register / unregister)
        .try_clone()?;
    let w_socket = socket // used by the API user to publish / send outgoing messages
        .try_clone()?;
    Ok((
        EventBusPublisher::new(w_socket)?,
        EventBusListener::new(control_socket)?,
    ))
}

#[cfg(test)]
mod tests {
    use crate::eventbus;
    use crate::message::{Message, SendMessage};
    use serde_json::json;
    use testcontainers::images::generic::{GenericImage, WaitFor};
    use testcontainers::*;

    fn mock_eventbus_server() -> GenericImage {
        GenericImage::new("aesteve/tests:mock-eventbus-server")
            .with_wait_for(WaitFor::message_on_stdout("TCP bridge connected"))
    }

    #[test]
    fn test_ping() {
        let docker = clients::Cli::default();
        let node = docker.run(mock_eventbus_server());
        let host_port = node
            .get_host_port(7542)
            .expect("Mock event bus server implementation needs to be up before running tests");
        let addr = format!("localhost:{}", host_port);
        println!("Mock server running on {}", addr);
        let (mut publisher, _) = eventbus(addr).expect("Event bus creation must not fail");
        publisher
            .ping()
            .expect("Should be able to send ping to the server");
    }

    #[test]
    fn consumer_test() {
        let docker = clients::Cli::default();
        let node = docker.run(mock_eventbus_server());
        let host_port = node
            .get_host_port(7542)
            .expect("Mock event bus server implementation needs to be up before running tests");
        let addr = format!("localhost:{}", host_port);
        println!("Mock server running on {}", addr);
        let (_, mut listener) = eventbus(addr).expect("Event bus creation must not fail");
        let mut consumer = listener.consumer("out-address".to_string()).unwrap();
        let mut received_msgs = Vec::new();
        while received_msgs.len() < 3 {
            if let Some(Ok(msg)) = consumer.next() {
                assert!(received_msgs
                    .iter()
                    .find(|m: &&Message| m.body == msg.body)
                    .is_none()); // same message has not been received twice
                received_msgs.push(msg);
            }
        }
        listener
            .unregister_consumer("out-address".to_string())
            .expect("Unregistering consumer must not fail");
    }

    #[test]
    fn send_reply_pattern() {
        let docker = clients::Cli::default();
        let node = docker.run(mock_eventbus_server());
        let host_port = node
            .get_host_port(7542)
            .expect("Mock event bus server implementation needs to be up before running tests");
        let addr = format!("localhost:{}", host_port);
        println!("Mock server running on {}", addr);
        let (mut publisher, mut listener) =
            eventbus(addr).expect("Event bus creation must not fail");
        let reply_address = "the-reply-address";
        let mut consumer = listener.consumer(reply_address.to_string()).unwrap();
        let payload = json!({"test": "value"});
        let expected_payload = payload.clone();
        publisher
            .send(SendMessage {
                address: "echo-address".to_string(),
                reply_address: Some(reply_address.to_string()),
                body: Some(payload),
                headers: None,
            })
            .expect("Sending a message to the event bus must work fine");
        let mut received_msgs = 0;
        while received_msgs == 0 {
            if let Some(Ok(msg)) = consumer.next() {
                assert_eq!(reply_address, msg.address);
                assert_eq!(
                    expected_payload,
                    msg.body.expect("Body should be extracted")
                );
                received_msgs += 1;
            }
        }
    }

    #[test]
    fn pub_sub_pattern() {
        let docker = clients::Cli::default();
        let node = docker.run(mock_eventbus_server());
        let host_port = node
            .get_host_port(7542)
            .expect("Mock event bus server implementation needs to be up before running tests");
        let addr = format!("localhost:{}", host_port);
        println!("Mock server running on {}", addr);
        let (mut publisher, _) = eventbus(addr).expect("Event bus creation must not fail");
        let payload = json!({"test": "value"});
        publisher
            .publish(Message {
                address: "in-address".to_string(),
                body: Some(payload),
                headers: None,
            })
            .expect("Publishing a message to the event bus must work fine");
    }

    #[test]
    fn test_errors() {
        let docker = clients::Cli::default();
        let node = docker.run(mock_eventbus_server());
        let host_port = node
            .get_host_port(7542)
            .expect("Mock event bus server implementation needs to be up before running tests");
        let addr = format!("localhost:{}", host_port);
        println!("Mock server running on {}", addr);
        let (mut publisher, mut listener) =
            eventbus(addr).expect("Event bus creation must not fail");
        let payload = json!({"test": "value"});
        publisher
            .send(SendMessage {
                address: "error-address".to_string(),
                reply_address: Some("the-reply-address".to_string()),
                body: Some(payload),
                headers: None,
            })
            .expect("Publishing a message to the event bus must work fine");
        let mut errors_received = 0;
        let mut errors = listener.errors().expect("Can listen to errors");
        while errors_received == 0 {
            if let Some(Ok(error_msg)) = errors.next() {
                assert_eq!(error_msg.message, "FORBIDDEN".to_string(),);
                errors_received += 1;
            }
        }
    }

    #[test]
    fn connect_to_an_unexisting_address_should_fail() {
        let eb = eventbus("127.0.0.1::1111");
        assert!(eb.is_err());
    }

    #[test]
    fn should_be_notified_of_errors() {
        let docker = clients::Cli::default();
        let node = docker.run(mock_eventbus_server());
        let host_port = node
            .get_host_port(7542)
            .expect("Mock event bus server implementation needs to be up before running tests");
        let addr = format!("localhost:{}", host_port);
        println!("Mock server running on {}", addr);
        let (_, mut listener) = eventbus(addr).expect("Event bus creation must not fail");
        let mut error_listener = listener
            .errors()
            .expect("Can ask for an iterator over error messages");
        listener
            .consumer("something_we_dont_have_access_to".to_string())
            .expect("Can subscribe to any address");
        let mut errors_received = 0;
        while errors_received < 1 {
            if let Some(Ok(error_msg)) = error_listener.next() {
                errors_received += 1;
                assert!(error_msg.message.contains("denied"))
            }
        }
    }
}