1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
extern crate nats;

use nats::Client;

use std::net::ToSocketAddrs;
use std::net::IpAddr;

fn resolve(host: &str) -> Result<String, &'static str> {
    match (host, 0).to_socket_addrs() {
        Ok(ips) => {
            let ips: Vec<IpAddr> = ips.map(|address| address.ip()).collect();
            if ips.len() > 0 {
                let ip = ips[0].to_string();
                Ok(ip)
            } else {
                println!("Error lookup host - host not found");
                Err("Error lookup host - host not found")
            }
        },
        Err(e) => {
            println!("Error lookup host {}",e);
            Err("Error lookup host")
        }
    }
}

pub struct MessengerEvent {
  pub msg: String,
  pub subject: String,
  pub inbox: Option<String>,
}

#[derive(Debug)]
pub struct MessengerClient {
    client: Client,
}

impl MessengerClient {
    pub fn new(hosts: String, name: String) -> Result<MessengerClient, &'static str> {
        let hosts_parsed: Vec<String> = hosts.split(',')
            .map(|host| {
                let host = String::from(host);
                let len = host.len();
                let beta_offset = host.find(':').unwrap_or(len);
                let port = host.get(beta_offset..len).unwrap_or_default();
                let host = host.get(0..beta_offset).unwrap_or_default();

                format!("nats://{}{}", resolve(&host).unwrap_or_default(), port)
            })
            .collect();
        println!("Trying to connect to -> {:?}", hosts_parsed);
        match Client::new(hosts_parsed) {
            Ok(mut client) => { 
                println!(".....Connected!");
                client.set_name(&name);
                Ok(MessengerClient {
                    client,
                })
            },
            Err(e) => {
                println!("Connection to Nats is failed: {}", e);
                return Err("Connection to Nats is failed");
            },
        }
    }

    pub fn subscribe(&mut self, subject: &str, queue: Option<&str>) -> Result<(), &'static str> {
        if let Err(e) = self.client.subscribe(subject, queue) {
            println!("Subscribe is failed! {}", e);
            return Err("Subscribe is failed!");
        } else {
            return Ok(());
        }
    }

    pub fn consume_messages(&mut self) -> Result<MessengerEvent, &'static str> {
        match self.client.wait() {
            Ok(event) => {
                let msg = String::from_utf8_lossy(&event.msg).into_owned();

                Ok(MessengerEvent {
                  msg,
                  subject: event.subject,
                  inbox: event.inbox,
                })
            },
            Err(e) => {
                println!("Consume messages is failed! {}", e);
                Err("Consume messages is failed!")
            }
        }
    }

    pub fn produce_message(&mut self, subject: &str, data: &str) -> Result<(), &'static str> {
        println!("About to publish a message to: {}", subject);
        if let Err(e) = self.client.publish(subject, data.as_bytes()) {
            println!("Publish error {}", e);
            return Err("Publish error!");
        } else {
            return Ok(());
        }
    }

    pub fn rpc_call(&mut self, subject: &str, data: &str) -> Result<MessengerEvent, &'static str> {
        println!("About to rpc call to: {}", subject);
        match self.client.make_request(subject, data.as_bytes()) {
            Ok(inbox) => {
                if let Err(e) = self.subscribe(&inbox, None) {
                    println!("Rpc subscribe error {}", e);
                    return Err("Rpc subscribe error!");
                } else {
                    match self.consume_messages() {
                        Ok(res) => {
                            Ok(res)
                        },
                        Err(e) => {
                            println!("Rpc consume messages error {}", e);
                            return Err("Rpc consume messages error!");
                        }
                    }
                }
            },
            Err(e) => {
                println!("Make Rpc request is failed! {}", e);
                Err("Make Rpc request is failed!")
            }
        }
    }
}