1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
use crate::configuration::config_model::ConnectionProperties as LocalProperties;
use futures::future::{BoxFuture, FutureExt};
use lapin::Channel;
use lapin::{Connection, ConnectionProperties};
use std::{thread, time};

#[derive(Debug)]
pub struct GenericError<T> {
    pub why: ErrorType,
    pub last_reason: T,
}

#[derive(Debug)]
pub enum ErrorType {
    MaximumConnectionRetriesReached,
    CannotCreateChannel,
    Unknown,
}

/// Returns a Connection, retries x times

/// This is an async resursive function

pub fn get_connection<'a>(
    addr: &'a str,
    retry: u64,
    total_retries: u64,
) -> BoxFuture<'a, Result<Connection, GenericError<lapin::Error>>> {
    return Box::pin(
        async move {
            let con_promise = Connection::connect(
                &addr,
                ConnectionProperties::default().with_default_executor(8),
            );
            let conn_res = con_promise.await;
            let connection = match conn_res {
                Ok(c) => Ok(c),
                Err(why) => {
                    println!("[{}] - {:?}", line!(), why);
                    if retry > total_retries {
                        GenericError {
                            why: ErrorType::MaximumConnectionRetriesReached,
                            last_reason: why,
                        };
                    }
                    let hibernate = time::Duration::from_millis(retry * 1000);
                    thread::sleep(hibernate);
                    let c = get_connection(addr, retry + 1, total_retries);
                    c.await
                }
            };
            connection
        }
        .boxed(),
    );
}

/// # builds URL

/// although heartbeat and connection_timeout are optional

/// parameters, they are really useful and allow you to fail

/// easier and more precisely. So they are used by default.

pub fn build_url(config: LocalProperties) -> String {
    let url = format!(
        "amqp://{}:{}@{}:{}/{}?heartbeat={}&connection_timeout={}",
        config.username,
        config.password,
        config.host,
        config.port,
        config.vhost,
        config.heartbeat,
        config.connection_timeout
    );

    return url;
}

/// # Creates a channel

/// * Gets a valid connection

/// * Returns a channel

pub async fn create_channel<'a>(
    addr: &'a str,
    total_retries: u64,
) -> Result<Channel, GenericError<lapin::Error>> {
    let conn = get_connection(&addr, 0, total_retries).await?;

    return match conn.create_channel().await {
        Ok(ch) => Ok(ch),
        Err(why) => Err(GenericError {
            why: ErrorType::CannotCreateChannel,
            last_reason: why,
        }),
    };
}

#[cfg(test)]
mod tests {
    use crate::configuration::config_model::*;
    use crate::consumer::connection_manager::build_url;

    #[test]
    fn amqp_url_generated_succesfully() {
        let url = build_url(ConnectionProperties::default());
        assert_eq!(
            "amqp://guest:guest@127.0.0.1:5672//?heartbeat=10&connection_timeout=1000",
            url
        );
    }
}