use std::fmt;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
#[cfg(feature = "security")]
use openssl::ssl::{SslContext, SslStream};
use error::{Error, Result};
#[cfg(not(feature = "security"))]
type KafkaStream = TcpStream;
#[cfg(feature = "security")]
use self::openssled::KafkaStream;
#[cfg(feature = "security")]
mod openssled {
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::time::Duration;
use openssl::ssl::SslStream;
pub enum KafkaStream {
Plain(TcpStream),
Ssl(SslStream<TcpStream>)
}
impl KafkaStream {
fn get_ref(&self) -> &TcpStream {
match self {
&KafkaStream::Plain(ref s) => s,
&KafkaStream::Ssl(ref s) => s.get_ref()
}
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.get_ref().set_read_timeout(dur)
}
pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.get_ref().set_write_timeout(dur)
}
}
impl Read for KafkaStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
&mut KafkaStream::Plain(ref mut s) => s.read(buf),
&mut KafkaStream::Ssl(ref mut s) => s.read(buf),
}
}
}
impl Write for KafkaStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
&mut KafkaStream::Plain(ref mut s) => s.write(buf),
&mut KafkaStream::Ssl(ref mut s) => s.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
&mut KafkaStream::Plain(ref mut s) => s.flush(),
&mut KafkaStream::Ssl(ref mut s) => s.flush(),
}
}
}
}
pub struct KafkaConnection {
host: String,
stream: KafkaStream,
}
impl fmt::Debug for KafkaConnection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "KafkaConnection to {}", self.host)
}
}
impl KafkaConnection {
pub fn send(&mut self, msg: &[u8]) -> Result<usize> {
self.stream.write(&msg[..]).map_err(From::from)
}
pub fn read_exact(&mut self, size: u64) -> Result<Vec<u8>> {
let mut buffer: Vec<u8> = Vec::with_capacity(size as usize);
let mut s = (&mut self.stream).take(size);
let bytes_read = try!(s.read_to_end(&mut buffer));
if bytes_read != size as usize {
Err(Error::UnexpectedEOF)
} else {
Ok(buffer)
}
}
fn from_stream(stream: KafkaStream, host: &str, timeout_secs: i32) -> KafkaConnection {
if timeout_secs > 0 {
let t = Some(Duration::from_secs(timeout_secs as u64));
stream.set_read_timeout(t).expect("Set connection read-timeout");
stream.set_write_timeout(t).expect("Set connection write-timeout");
}
KafkaConnection{host: host.to_owned(), stream: stream}
}
#[cfg(not(feature = "security"))]
pub fn new(host: &str, timeout_secs: i32) -> Result<KafkaConnection> {
Ok(KafkaConnection::from_stream(try!(TcpStream::connect(host)), host, timeout_secs))
}
#[cfg(feature = "security")]
pub fn new(host: &str, timeout_secs: i32, security: Option<&SslContext>) -> Result<KafkaConnection> {
let stream = try!(TcpStream::connect(host));
let stream = match security {
Some(ctx) => KafkaStream::Ssl(try!(SslStream::connect(ctx, stream))),
None => KafkaStream::Plain(stream),
};
Ok(KafkaConnection::from_stream(stream, host, timeout_secs))
}
}