use std::sync::Arc;
use std::cmp::Ordering;
use std::net::SocketAddr;
use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::io::{Cursor, Result, Error, ErrorKind};
use mqtt311::{MqttWrite, QoS, LastWill, Packet, Publish};
use tcp::{Socket,
utils::{ContextHandle, Hibernate, Ready}};
use ws::connect::WsSocket;
use crate::{v311::WsMqtt311,
tls_v311::WssMqtt311,
utils::{ValueEq, BrokerSession}};
pub trait MqttSession: Debug + Send + Sync + 'static {
type Connect;
fn get_connect(&self) -> Option<&Self::Connect>;
fn bind_connect(&mut self, connect: Self::Connect);
fn unbind_connect(&mut self) -> Option<Self::Connect>;
fn is_accepted(&self) -> bool;
fn set_accept(&self, connect: bool);
fn is_clean(&self) -> bool;
fn set_clean(&self, clean: bool);
fn get_will(&self) -> Option<(&str, &str, u8, bool)>;
fn set_will(&mut self, topic: String, msg: String, qos: u8, retain: bool);
fn unset_will(&mut self) -> Option<(String, String, u8, bool)>;
fn get_user(&self) -> Option<&str>;
fn get_pwd(&self) -> Option<&str>;
fn set_user_pwd(&mut self, user: Option<String>, pwd: Option<String>);
fn unsend_packet(&self) -> Option<&[Vec<u8>]> {
None
}
fn unconfirm_sended(&self) -> Option<&[Vec<u8>]> {
None
}
fn unconfirm_received(&self) -> Option<&[Vec<u8>]> {
None
}
fn get_keep_alive(&self) -> u16;
fn set_keep_alive(&mut self, keep_alive: u16);
}
pub trait MqttConnect<S: Socket>: Debug + Send + Sync + 'static {
fn is_closed(&self) -> bool;
fn get_token(&self) -> Option<usize>;
fn get_uid(&self) -> Option<usize>;
fn get_local_addr(&self) -> Option<SocketAddr>;
fn get_remote_addr(&self) -> Option<SocketAddr>;
fn is_security(&self) -> bool;
fn is_passive_receive(&self) -> bool;
fn passive_receive(&self, b: bool);
fn get_session(&self) -> Option<ContextHandle<BrokerSession>>;
fn send(&self, topic: &String, payload: Arc<Vec<u8>>) -> Result<()>;
fn hibernate(&self, ready: Ready) -> Option<Hibernate<S>>;
fn wakeup(&self, result: Result<()>) -> bool;
fn close(&self, reason: Result<()>) -> Result<()>;
}
pub struct QosZeroSession<S: Socket> {
connect: Option<WsSocket<S>>, is_passive: Arc<AtomicBool>, client_id: String, is_accepted: Arc<AtomicBool>, is_clean: Arc<AtomicBool>, will: Option<LastWill>, user: Option<String>, pwd: Option<String>, keep_alive: u16, }
unsafe impl<S: Socket> Send for QosZeroSession<S> {}
unsafe impl<S: Socket> Sync for QosZeroSession<S> {}
impl<S: Socket> PartialEq<QosZeroSession<S>> for QosZeroSession<S> {
fn eq(&self, other: &Self) -> bool {
self.client_id.eq(&other.client_id)
}
}
impl<S: Socket> Eq for QosZeroSession<S> {}
impl<S: Socket> PartialOrd<QosZeroSession<S>> for QosZeroSession<S> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.client_id.partial_cmp(&other.client_id)
}
}
impl<S: Socket> Ord for QosZeroSession<S> {
fn cmp(&self, other: &Self) -> Ordering {
self.client_id.cmp(&other.client_id)
}
}
impl<S: Socket> Hash for QosZeroSession<S> {
fn hash<H: Hasher>(&self, state: &mut H) {
if let Some(ws) = &self.connect {
ws.get_uid().hash(state);
}
}
}
impl<S: Socket> Debug for QosZeroSession<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "QosZeroSession {{ client_id: {:?}, is_accepted: {}, is_clean: {}, keep_alive: {} }}",
self.client_id,
self.is_accepted.load(AtomicOrdering::Relaxed),
self.is_clean.load(AtomicOrdering::Relaxed),
self.keep_alive)
}
}
impl<S: Socket> Clone for QosZeroSession<S> {
fn clone(&self) -> Self {
QosZeroSession {
connect: self.connect.clone(),
is_passive: self.is_passive.clone(),
client_id: self.client_id.clone(),
is_accepted: self.is_accepted.clone(),
is_clean: self.is_clean.clone(),
will: self.will.clone(),
user: self.user.clone(),
pwd: self.pwd.clone(),
keep_alive: self.keep_alive,
}
}
}
impl<S: Socket> ValueEq for Arc<QosZeroSession<S>> {
fn value_eq(this: &Self, other: &Self) -> bool {
Arc::ptr_eq(this, other)
}
}
impl<S: Socket> MqttSession for QosZeroSession<S> {
type Connect = WsSocket<S>;
fn get_connect(&self) -> Option<&Self::Connect> {
self.connect.as_ref()
}
fn bind_connect(&mut self, connect: Self::Connect) {
self.connect = Some(connect);
}
fn unbind_connect(&mut self) -> Option<Self::Connect> {
self.connect.take()
}
fn is_accepted(&self) -> bool {
self.is_accepted.load(AtomicOrdering::Relaxed)
}
fn set_accept(&self, accept: bool) {
self.is_accepted.store(accept, AtomicOrdering::SeqCst);
}
fn is_clean(&self) -> bool {
self.is_clean.load(AtomicOrdering::Relaxed)
}
fn set_clean(&self, clean: bool) {
self.is_clean.store(clean, AtomicOrdering::SeqCst);
}
fn get_will(&self) -> Option<(&str, &str, u8, bool)> {
if let Some(w) = &self.will {
return Some((&w.topic, &w.message, w.qos.to_u8(), w.retain));
}
None
}
fn set_will(&mut self, topic: String, message: String, qos: u8, retain: bool) {
self.will = Some(LastWill {
topic,
message,
qos: QoS::from_u8(qos).unwrap(),
retain,
});
}
fn unset_will(&mut self) -> Option<(String, String, u8, bool)> {
if let Some(w) = self.will.take() {
return Some((w.topic, w.message, w.qos.to_u8(), w.retain));
}
None
}
fn get_user(&self) -> Option<&str> {
if let Some(u) = &self.user {
return Some(u.as_str());
}
None
}
fn get_pwd(&self) -> Option<&str> {
if let Some(p) = &self.pwd {
return Some(p.as_str());
}
None
}
fn set_user_pwd(&mut self, user: Option<String>, pwd: Option<String>) {
self.user = user;
self.pwd = pwd;
}
fn get_keep_alive(&self) -> u16 {
self.keep_alive
}
fn set_keep_alive(&mut self, keep_alive: u16) {
self.keep_alive = keep_alive;
}
}
impl<S: Socket> MqttConnect<S> for QosZeroSession<S> {
fn is_closed(&self) -> bool {
if let Some(connect) = &self.connect {
return connect.is_closed();
}
true
}
fn get_token(&self) -> Option<usize> {
if let Some(connect) = &self.connect {
return Some(connect.get_token().0);
}
None
}
fn get_uid(&self) -> Option<usize> {
if let Some(connect) = &self.connect {
return Some(connect.get_uid());
}
None
}
fn get_local_addr(&self) -> Option<SocketAddr> {
if let Some(connect) = &self.connect {
return Some(connect.get_local().clone());
}
None
}
fn get_remote_addr(&self) -> Option<SocketAddr> {
if let Some(connect) = &self.connect {
return Some(connect.get_remote().clone());
}
None
}
fn is_security(&self) -> bool {
if let Some(connect) = &self.connect {
return connect.is_security();
}
false
}
fn is_passive_receive(&self) -> bool {
self.is_passive.load(AtomicOrdering::Relaxed)
}
fn passive_receive(&self, b: bool) {
self.is_passive.store(b, AtomicOrdering::SeqCst);
}
fn get_session(&self) -> Option<ContextHandle<BrokerSession>> {
if let Some(connect) = &self.connect {
if connect.is_closed() {
return None;
}
if let Some(session) = connect.get_session() {
return session.as_ref().get_context().get::<BrokerSession>();
}
}
None
}
fn send(&self, topic: &String, payload: Arc<Vec<u8>>) -> Result<()> {
if let Some(connect) = &self.connect {
let packet = Packet::Publish(Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
topic_name: topic.clone(),
pkid: None,
payload,
});
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
if let Err(e) = buf.write_packet(&packet) {
return Err(Error::new(ErrorKind::InvalidData,
format!("Mqtt session send failed, reason: {:?}",
e)));
}
if connect.is_security() {
return connect.send(WssMqtt311::WS_MSG_TYPE, buf.into_inner());
} else {
return connect.send(WsMqtt311::WS_MSG_TYPE, buf.into_inner());
}
}
Ok(())
}
fn hibernate(&self, ready: Ready) -> Option<Hibernate<S>> {
if let Some(connect) = &self.connect {
connect.hibernate(ready)
} else {
None
}
}
fn wakeup(&self, result: Result<()>) -> bool {
if let Some(connect) = &self.connect {
connect.wakeup(result)
} else {
true
}
}
fn close(&self, reason: Result<()>) -> Result<()> {
if let Some(connect) = &self.connect {
return connect.close(reason);
}
Ok(())
}
}
impl<S: Socket> QosZeroSession<S> {
pub fn with_client_id(client_id: String) -> Self {
QosZeroSession {
connect: None,
is_passive: Arc::new(AtomicBool::new(false)),
client_id,
is_accepted: Arc::new(AtomicBool::new(false)),
is_clean: Arc::new(AtomicBool::new(false)),
will: None,
user: None,
pwd: None,
keep_alive: 0,
}
}
}
impl ValueEq for usize {
fn value_eq(this: &Self, other: &Self) -> bool {
this == other
}
}