use channel;
use connection::Connection;
use protocol::{self, MethodFrame};
use table;
use table::TableEntry::{FieldTable, Bool, LongString};
use framing::Frame;
use amqp_error::{AMQPResult, AMQPError};
use std::sync::{Arc, Mutex};
use std::default::Default;
use std::collections::HashMap;
use std::sync::mpsc::{SyncSender, sync_channel};
use std::thread;
use std::cmp;
use url::{UrlParser, SchemeType};
const CHANNEL_BUFFER_SIZE :usize = 100;
#[derive(Debug)]
pub struct Options <'a> {
pub host: &'a str,
pub port: u16,
pub login: &'a str,
pub password: &'a str,
pub vhost: &'a str,
pub frame_max_limit: u32,
pub channel_max_limit: u16,
pub locale: &'a str
}
impl <'a> Default for Options <'a> {
fn default() -> Options <'a> {
Options {
host: "127.0.0.1", port: 5672, vhost: "/",
login: "guest", password: "guest",
frame_max_limit: 131072, channel_max_limit: 65535,
locale: "en_US"
}
}
}
pub struct Session {
connection: Connection,
channels: Arc<Mutex<HashMap<u16, SyncSender<Frame>> >>,
channel_max_limit: u16,
channel_zero: channel::Channel
}
impl Session {
pub fn open_url(url_string: &str) -> AMQPResult<Session> {
let default: Options = Default::default();
let mut url_parser = UrlParser::new();
url_parser.scheme_type_mapper(scheme_type_mapper);
let url = url_parser.parse(url_string).unwrap();
let vhost = url.serialize_path().unwrap_or(default.vhost.to_string());
let host = url.domain().unwrap_or(default.host);
let port = url.port().unwrap_or(default.port);
let login = url.username().and_then(|username| match username { "" => None, _ => Some(username)} ).unwrap_or(default.login);
let password = url.password().unwrap_or(default.password);
let opts = Options { host: host, port: port,
login: login, password: password,
vhost: &vhost, ..Default::default()};
Session::new(opts)
}
pub fn new(options: Options) -> AMQPResult<Session> {
let connection = try!(Connection::open(options.host, options.port));
let (channel_sender, channel_receiver) = sync_channel(CHANNEL_BUFFER_SIZE); let channels = Arc::new(Mutex::new(HashMap::new()));
let channel_zero = channel::Channel::new(0, channel_receiver, connection.clone());
try!(channels.lock().map_err(|_| AMQPError::SyncError)).insert(0, channel_sender);
let con1 = connection.clone();
let channels_clone = channels.clone();
thread::spawn( || Session::reading_loop(con1, channels_clone ) );
let mut session = Session {
connection: connection,
channels: channels,
channel_max_limit: 0,
channel_zero: channel_zero
};
try!(session.init(options));
Ok(session)
}
fn init(&mut self, options: Options) -> AMQPResult<()> {
debug!("Starting init session");
let frame = self.channel_zero.read(); let method_frame = MethodFrame::decode(frame);
let start : protocol::connection::Start = match method_frame.method_name(){
"connection.start" => protocol::Method::decode(method_frame).ok().unwrap(),
meth => panic!("Unexpected method frame: {:?}", meth) };
debug!("Received connection.start: {:?}", start);
let mut client_properties = table::new();
let mut capabilities = table::new();
capabilities.insert("publisher_confirms".to_string(), Bool(true));
capabilities.insert("consumer_cancel_notify".to_string(), Bool(true));
capabilities.insert("exchange_exchange_bindings".to_string(), Bool(true));
capabilities.insert("basic.nack".to_string(), Bool(true));
capabilities.insert("connection.blocked".to_string(), Bool(true));
capabilities.insert("authentication_failure_close".to_string(), Bool(true));
client_properties.insert("capabilities".to_string(), FieldTable(capabilities));
client_properties.insert("product".to_string(), LongString("rust-amqp".to_string()));
client_properties.insert("platform".to_string(), LongString("rust".to_string()));
client_properties.insert("version".to_string(), LongString("0.1".to_string()));
client_properties.insert("information".to_string(), LongString("https://github.com/Antti/rust-amqp".to_string()));
debug!("Sending connection.start-ok");
let start_ok = protocol::connection::StartOk {
client_properties: client_properties, mechanism: "PLAIN".to_string(),
response: format!("\0{}\0{}", options.login, options.password), locale: options.locale.to_string()};
let tune : protocol::connection::Tune = try!(self.channel_zero.rpc(&start_ok, "connection.tune"));
self.channel_max_limit = negotiate(tune.channel_max, self.channel_max_limit);
self.connection.frame_max_limit = negotiate(tune.frame_max, options.frame_max_limit);
self.channel_zero.connection.frame_max_limit = self.connection.frame_max_limit;
let frame_max_limit = self.connection.frame_max_limit;
let tune_ok = protocol::connection::TuneOk {
channel_max: self.channel_max_limit,
frame_max: frame_max_limit, heartbeat: 0};
self.channel_zero.send_method_frame(&tune_ok);
let open = protocol::connection::Open{virtual_host: options.vhost.to_string(), capabilities: "".to_string(), insist: false };
let _ : protocol::connection::OpenOk = try!(self.channel_zero.rpc(&open, "connection.open-ok"));
Ok(())
}
pub fn open_channel(&mut self, channel_id: u16) -> AMQPResult<channel::Channel> {
debug!("Openning channel: {}", channel_id);
let (sender, receiver) = sync_channel(CHANNEL_BUFFER_SIZE);
let mut channel = channel::Channel::new(channel_id, receiver, self.connection.clone());
try!(self.channels.lock().map_err(|_| AMQPError::SyncError)).insert(channel_id, sender);
try!(channel.open());
Ok(channel)
}
pub fn close(&mut self, reply_code: u16, reply_text: String) {
debug!("Closing session: reply_code: {}, reply_text: {}", reply_code, reply_text);
let close = protocol::connection::Close {reply_code: reply_code, reply_text: reply_text, class_id: 0, method_id: 0};
let _ : protocol::connection::CloseOk = self.channel_zero.rpc(&close, "connection.close-ok").ok().unwrap();
}
pub fn reading_loop(mut connection: Connection, channels: Arc<Mutex<HashMap<u16, SyncSender<Frame>>>>) -> () {
debug!("Starting reading loop");
loop {
let frame = match connection.read() {
Ok(frame) => frame,
Err(some_err) => {debug!("Error in reading loop: {:?}", some_err); break} };
let chans = channels.lock().unwrap();
let ref target_channel = (*chans)[&frame.channel];
target_channel.send(frame).ok().expect("Error sending packet");
}
debug!("Exiting reading loop");
}
}
fn negotiate<T : cmp::Ord>(their_value: T, our_value: T) -> T {
cmp::min(their_value, our_value)
}
fn scheme_type_mapper(scheme: &str) -> SchemeType {
match scheme{
"amqp" => SchemeType::Relative(5672),
_ => {panic!("Uknown scheme: {}", scheme)}
}
}