liner_broker 1.2.2

Redis based message serverless broker.
Documentation
use crate::redis;
use crate::{UCbackIntern, UData};
use crate::listener::Listener;
use crate::sender::Sender;
use crate::print_error;

use std::net::{SocketAddr, ToSocketAddrs};
use mio::net::TcpListener;
use std::sync::Mutex;
use std::collections::HashMap;

pub struct Client{
    unique_name: String,
    source_topic: String,
    localhost: String,
    db: redis::Connect,
    listener: Option<Listener>,
    sender: Option<Sender>,
    last_send_index: HashMap<String, usize>,
    is_run: bool,
    mtx: Mutex<()>,
    address_topic: HashMap<String, Vec<String>>,
    subscriptions: HashMap<i32, String>,
}

impl Client {
    pub fn new(unique_name: &str, topic: &str, localhost: &str, redis_path: &str) -> Option<Client> {
        let mut db = redis::Connect::new(unique_name, redis_path).ok()?;
        db.set_source_topic(topic);
        db.set_source_localhost(localhost);        
        Some(
            Self{
                unique_name: unique_name.to_string(),
                source_topic: topic.to_string(),
                localhost: localhost.to_string(),
                db,
                listener: None,
                sender: None,
                last_send_index: HashMap::new(),
                is_run: false,
                mtx: Mutex::new(()),
                address_topic: HashMap::new(),
                subscriptions: HashMap::new(), 
            }
        )
    }
    pub fn run(&mut self, receive_cb: UCbackIntern, udata: UData) -> bool {
        let _lock = self.mtx.lock();
        if self.is_run{
            print_error!("client already is running");
            return true;
        }
        if let Err(err) = self.db.regist_topic(&self.source_topic){
            print_error!(&format!("{}", err));
            return false;
        }
        let sa = str_to_socket_addr(&self.localhost);
        if sa.is_none(){
            return false;
        }
        let tcp_listener = match TcpListener::bind(sa.unwrap()) {
            Ok(l) => l,
            Err(err) => {
                print_error!(&format!("{}", err));
                return false;
            }
        };
        self.listener = Some(Listener::new(
            tcp_listener,
            &self.unique_name,
            &self.db.redis_path(),
            &self.source_topic,
            &self.subscriptions,
            receive_cb,
            udata,
        ));
        self.sender = Some(Sender::new(
            &self.unique_name,
            &self.db.redis_path(),
            &self.source_topic,
        ));
        if let Some(sender) = self.sender.as_mut() {
            sender.load_prev_connects(&mut self.db);
        }
        self.is_run = true;

        true
    }

    pub fn send_to(&mut self, topic: &str, data: &[u8], at_least_once_delivery: bool) -> bool {
        let _lock = self.mtx.lock().unwrap();
        if !self.is_run{
            print_error!("you can't send_to because client not is running");
            return false;
        }
        if topic == self.source_topic{
            print_error!("you can't send on your own topic");
            return false;
        }
        if !self.address_topic.contains_key(topic){ 
            if let Some(addr) = get_address_topic(topic, &mut self.db){
                self.address_topic.insert(topic.to_string(), addr);
            }
        }
        if let Some(address) = self.address_topic.get(topic){       
            let index = self.last_send_index.entry(topic.to_string()).or_insert(0);
            if *index >= address.len(){
                *index = 0;
            }
            let addr = &address[*index];
            let ok = self.sender.as_mut().unwrap().send_to(&mut self.db, addr, 
                                    topic, data, at_least_once_delivery);
            *index += 1;
            ok
        }else{
            print_error!(&format!("not found addr for topic {}", topic));
            false
        }
    }

    pub fn send_all(&mut self, topic: &str, data: &[u8], at_least_once_delivery: bool) -> bool {
        let _lock = self.mtx.lock().unwrap();
        if !self.is_run{
            print_error!("you can't send_all because client not is running");
            return false;
        }
        if topic == self.source_topic{
            print_error!("you can't send on your own topic");
            return false;
        }
        if !self.address_topic.contains_key(topic){ 
            if let Some(addr) = get_address_topic(topic, &mut self.db){
                self.address_topic.insert(topic.to_string(), addr);
            }
        }
        if let Some(address) = self.address_topic.get(topic){       
            let mut ok = true;
            for addr in address{
                ok &= self.sender.as_mut().unwrap().send_to(&mut self.db, addr, 
                                        topic, data, at_least_once_delivery);
            }
            ok
        }else{
            print_error!(&format!("not found addr for topic {}", topic));
            false
        }
    }

    pub fn subscribe(&mut self, topic: &str) -> bool {
        let _lock = self.mtx.lock();
        if topic == self.source_topic{
            print_error!("you can't subscribe on your own topic");
            return false;
        }
        if let Err(err) = self.db.regist_topic(topic){
            print_error!(&format!("{}", err));
            return false;
        }
        match self.db.get_topic_key(topic) {
            Ok(topic_key)=>{
                if self.is_run{ 
                    self.listener.as_mut().unwrap().subscribe(topic, topic_key);
                }else{
                    self.subscriptions.insert(topic_key, topic.to_owned());
                }
            },
            Err(err)=>{
                print_error!(&format!("{}", err));
                return false;
            }
        } 
        true
    }

    pub fn unsubscribe(&mut self, topic: &str) -> bool {
        let _lock = self.mtx.lock();
        if topic == self.source_topic{
            print_error!("you can't unsubscribe on your own topic");
            return false;
        }
        if let Err(err) = self.db.unregist_topic(topic){
            print_error!(&format!("{}", err));
            return false;
        } 
        match self.db.get_topic_key(topic) {
            Ok(topic_key)=>{
                if self.is_run{                   
                    self.listener.as_mut().unwrap().unsubscribe(topic_key);
                }else{
                    self.subscriptions.remove(&topic_key);
                }
            },
            Err(err)=>{
                print_error!(&format!("{}", err));
                return false;
            }
        } 
        true
    }

    pub fn refresh_address_topic(&mut self, topic: &str) -> bool {
        let _lock = self.mtx.lock();
        
        if let Some(addr) = get_address_topic(topic, &mut self.db){
            self.address_topic.insert(topic.to_string(), addr);
            true
        } else {
            false
        }
    }

    pub fn clear_stored_messages(&mut self) -> bool {
        let _lock = self.mtx.lock();
        if self.is_run{
            print_error!("you can't clear_stored_messages because client already is running");
            return false;
        }
        if let Err(err) = self.db.clear_stored_messages(){
            print_error!(&format!("{}", err));
            return false;
        }
        true
    }
    pub fn clear_addresses_of_topic(&mut self) -> bool {
        let _lock = self.mtx.lock();
        if self.is_run{
            print_error!("you can't clear_addresses_of_topic because client already is running");
            return false;
        }
        if let Err(err) = self.db.clear_addresses_of_topic(){
            print_error!(&format!("{}", err));
            return false;
        }
        true
    }
}

fn get_address_topic(topic: &str, db: &mut redis::Connect)->Option<Vec<String>>{
    match db.get_addresses_of_topic(true, topic){
        Ok(addresses)=>{
            if !addresses.is_empty(){
                return Some(addresses);
            }
        },
        Err(err)=>{
            print_error!(&format!("{}", err));
        }
    }
    None
}

fn str_to_socket_addr(localhost: &str)->Option<SocketAddr>{
    match localhost.to_socket_addrs() {
        Ok(mut sa_)=>{
            sa_.next()
        }
        Err(err)=>{
            print_error!(&format!("{}", err));
            None
        }            
    }    
}

impl Drop for Client {
    fn drop(&mut self) {
        let _lock = self.mtx.lock();
        if !self.is_run{
            return;
        }
        drop(self.sender.take());
        drop(self.listener.take());
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn str_to_socket_addr_rejects_invalid() {
        assert!(str_to_socket_addr("not-a-socket-addr").is_none());
    }

    #[test]
    fn str_to_socket_addr_accepts_localhost_port() {
        assert!(str_to_socket_addr("127.0.0.1:0").is_some());
        assert!(str_to_socket_addr("localhost:0").is_some());
    }
}