1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
extern crate zmtp;

use core::*;
use std::collections::VecDeque;
use std::io::ErrorKind::*;
use std::io::{Result as Result, Write};
use std::net::{SocketAddr, ToSocketAddrs, TcpStream};
use std::u32;
use zmtp::{Frame as Frame, Message as Message, Peer as Peer,
           SocketType as SocketType};


pub fn req<A: ToSocketAddrs>(address: A) -> Result<ReqSend> {
    let mut r = ReqSend {
        address: address.to_socket_addrs().unwrap().next().unwrap() ,
        peers: VecDeque::new()
    };
    try!{ r.connect(&address) };
    Ok(r)
}


#[derive(Debug)]
pub struct ReqSend { address: SocketAddr , peers: VecDeque<Peer> }

impl ReqSend {
    pub fn send_request(mut self, req: &str) -> ReqReceive {
        assert!(req.len() <= u32::MAX as usize);
        let frame = Frame::from_str(&req);
        let mut msg = Message::empty_frame();
        msg.add_frame(frame);
        loop {
            if self.peers.is_empty() {
                let address = self.address;
                self.connect(address).unwrap();
            }

            match self.peer().send_msg(&msg) {
                Err(err) => {
                    println!("WARN: {:?}: {:?}", err.kind(), err);
                    self.remove_peer();
                    continue
                },
                Ok(result) => {
                    assert_eq!(result,  ());
                    self.flush_peer();
                    break
                },
            }
        }
        ReqReceive { address: self.address, peers: self.peers }
    }

    pub fn connect<A: ToSocketAddrs>(&mut self, address: A) -> Result<()> {
        loop { match TcpStream::connect(&address) {
            Err(ref err) if err.kind() == ConnectionRefused => continue,
            Err(err) => return Err(err),
            Ok(stream) => {
                let mut peer = Peer::new(Box::new(stream));
                let result = peer
                    .local_socket_type(SocketType::REQ)
                    .execute_version_handshake();
                return result.map(|_| self.peers.push_back(peer))
            },
        }}
    }

    methods! {
        has_peers peer_count peer flush_peer add_peer remove_peer rotate_peers
    }
}


#[derive(Debug)]
pub struct ReqReceive { address: SocketAddr, peers: VecDeque<Peer> }

impl ReqReceive {
    pub fn receive_response(mut self, resp: &mut Result<String>) -> ReqSend {
        loop {
            if self.peers.is_empty() {
                let address = self.address;
                self.connect(address).unwrap();
            }

            match self.peer().recv_msg() {
                Err(err) => {
                    println!("Req WARN: {:?}: {:?}", err.kind(), err);
                    self.remove_peer();
                    *resp = Err(err);
                    break
                },
                Ok(msg) => {
                    let string = msg
                        .into_frame_iter()
                        .skip(EMPTY_FRAME)
                        .fold(String::new(), |acc, frame| {
                            acc + &String::from_utf8(frame.get_body()).unwrap()
                        });
                    *resp = Ok(string);
                    self.rotate_peers();
                    break
                },
            }
        }
        // TODO: Discard any msgs available in `peers` W/O blocking the thread
        ReqSend { address: self.address, peers: self.peers }
    }

    pub fn connect<A: ToSocketAddrs>(&mut self, address: A) -> Result<()> {
        loop { match TcpStream::connect(&address) {
            Err(ref err) if err.kind() == ConnectionRefused => continue,
            Err(err) => return Err(err),
            Ok(stream) => {
                let mut peer = Peer::new(Box::new(stream));
                let result = peer
                    .local_socket_type(SocketType::REQ)
                    .execute_version_handshake();
                return result.map(|_| self.peers.push_back(peer))
            },
        }}
    }

    methods! {
        add_peer flush_peer has_peers peer peer_count remove_peer rotate_peers
    }
}

//  LocalWords:  Req