use std::sync::Arc;
use std::net::SocketAddr;
use std::result::Result as GenResult;
use std::io::{Error, ErrorKind, Result};
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
use futures::future::{FutureExt, LocalBoxFuture};
use futures::StreamExt;
use log::warn;
use pi_atom::Atom;
use pi_gray::GrayVersion;
use pi_handler::{Args, Handler};
use tcp::{Socket,
utils::{ContextHandle, SocketContext, Hibernate, Ready}};
use mqtt::server::MqttBrokerProtocol;
use mqtt::broker::{MQTT_RESPONSE_SYS_TOPIC, MqttBrokerListener, MqttBrokerService};
use mqtt::session::{MqttSession, MqttConnect};
use mqtt::utils::BrokerSession;
pub enum MqttEvent {
Connect(usize, String, String, u16, bool, Option<String>, Option<String>), Disconnect(usize, String, String, Result<()>), Sub(usize, String, String, Vec<(String, u8)>), Unsub(usize, String, String, Vec<String>), Publish(usize, String, String, Option<SocketAddr>, String, Arc<Vec<u8>>), }
pub struct MqttConnectHandle<S: Socket> {
gray: AtomicIsize, client_id: String, protocol: MqttBrokerProtocol, connect: Arc<dyn MqttConnect<S>>, is_closed: AtomicBool, }
unsafe impl<S: Socket> Send for MqttConnectHandle<S> {}
unsafe impl<S: Socket> Sync for MqttConnectHandle<S> {}
impl<S: Socket> GrayVersion for MqttConnectHandle<S> {
fn get_gray(&self) -> &Option<usize> {
let gray = self.gray.load(Ordering::Relaxed);
if gray < 0 {
return &None;
}
&None }
fn set_gray(&mut self, gray: Option<usize>) {
if let Some(n) = gray {
self.gray.store(n as isize, Ordering::SeqCst);
} else {
self.gray.store(-1, Ordering::SeqCst);
}
}
fn get_id(&self) -> usize {
if let Some(uid) = self.connect.get_uid() {
return uid;
}
0
}
}
impl<S: Socket> MqttConnectHandle<S> {
pub fn get_token(&self) -> Option<usize> {
self.connect.get_token()
}
pub fn get_uid(&self) -> Option<usize> {
self.connect.get_uid()
}
pub fn get_local_addr(&self) -> Option<SocketAddr> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
self.connect.get_local_addr()
}
pub fn get_local_ip(&self) -> Option<String> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
if let Some(addr) = self.get_local_addr() {
return Some(addr.ip().to_string());
}
None
}
pub fn get_local_port(&self) -> Option<u16> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
if let Some(addr) = self.get_local_addr() {
return Some(addr.port());
}
None
}
pub fn get_remote_addr(&self) -> Option<SocketAddr> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
self.connect.get_remote_addr()
}
pub fn get_remote_ip(&self) -> Option<String> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
if let Some(addr) = self.get_remote_addr() {
return Some(addr.ip().to_string());
}
None
}
pub fn get_remote_port(&self) -> Option<u16> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
if let Some(addr) = self.get_remote_addr() {
return Some(addr.port());
}
None
}
pub fn is_security(&self) -> bool {
self.connect.is_security()
}
pub fn is_passive(&self) -> bool {
self.connect.is_passive_receive()
}
pub fn set_passive(&self, b: bool) {
self.connect.passive_receive(b);
}
pub fn hibernate(&self, ready: Ready) -> Option<Hibernate<S>> {
self.connect.hibernate(ready)
}
pub fn wakeup(&self, result: Result<()>) -> bool {
self.connect.wakeup(result)
}
pub fn sub(&self, topic: String) {
if self.is_security() {
if let MqttBrokerProtocol::WssMqtt311(broker) = &self.protocol {
if let Some(session) = broker.get_broker().get_session(&self.client_id) {
let _ = broker.get_broker().subscribe(session.clone(), topic);
}
}
} else {
if let MqttBrokerProtocol::WsMqtt311(broker) = &self.protocol {
if let Some(session) = broker.get_broker().get_session(&self.client_id) {
let _ = broker.get_broker().subscribe(session.clone(), topic);
}
}
}
}
pub fn unsub(&self, topic: String) {
if self.is_security() {
if let MqttBrokerProtocol::WssMqtt311(broker) = &self.protocol {
if let Some(session) = broker.get_broker().get_session(&self.client_id) {
let _ = broker.get_broker().unsubscribe(&session, topic);
}
}
} else {
if let MqttBrokerProtocol::WsMqtt311(broker) = &self.protocol {
if let Some(session) = broker.get_broker().get_session(&self.client_id) {
let _ = broker.get_broker().unsubscribe(&session, topic);
}
}
}
}
pub fn get_session(&self) -> Option<ContextHandle<BrokerSession>> {
if self.is_closed.load(Ordering::Relaxed) {
return None;
}
self.connect.get_session()
}
pub fn send(&self, topic: &String, bin: Vec<u8>) {
self.connect.send(topic, Arc::new(bin));
}
pub fn reply(&self, bin: Vec<u8>) {
self.send(&MQTT_RESPONSE_SYS_TOPIC, bin);
if self.is_passive() {
while !self.wakeup(Ok(())) {
}
}
}
pub fn close(&self, reason: Result<()>) -> Result<()> {
if self.is_closed.load(Ordering::Relaxed) {
return Ok(());
}
self.connect.close(reason)
}
}
pub struct MqttProxyListener {
connect_handler: Option<Arc<dyn Handler<
A = MqttEvent,
B = (),
C = (),
D = (),
E = (),
F = (),
G = (),
H = (),
HandleResult = ()>
>>, }
unsafe impl Send for MqttProxyListener {}
impl<S: Socket> MqttBrokerListener<S> for MqttProxyListener {
fn connected(&self,
protocol: MqttBrokerProtocol,
connect: Arc<dyn MqttConnect<S>>) -> LocalBoxFuture<'static, Result<()>> {
if let Some(handler) = &self.connect_handler {
let handler = handler.clone();
async move {
if let Some(mut handle) = connect.get_session() {
if let Some(session) = handle.as_mut() {
let connect_handle = MqttConnectHandle {
gray: AtomicIsize::new(-1),
client_id: session.get_client_id().clone(),
protocol,
connect,
is_closed: AtomicBool::new(false),
};
let event = MqttEvent::Connect(connect_handle.get_id(),
connect_handle.protocol.get_broker_name().to_string(),
session.get_client_id().clone(),
session.get_keep_alive(),
session.is_clean_session(),
session.get_user().cloned(),
session.get_pwd().cloned());
handler.handle(Arc::new(connect_handle),
Atom::from(""),
Args::OneArgs(event)).await;
}
}
Ok(())
}.boxed_local()
} else {
async move {
Err(Error::new(ErrorKind::Other,
format!("Mqtt proxy connect failed, connect: {:?}, reason: handle connect error",
connect)))
}.boxed_local()
}
}
fn closed(&self,
protocol: MqttBrokerProtocol,
connect: Arc<dyn MqttConnect<S>>,
mut context: BrokerSession,
reason: Result<()>) -> LocalBoxFuture<'static, ()> {
if let Err(e) = &reason {
warn!("Mqtt proxy connect close by error, token: {:?}, remote: {:?}, local: {:?}, reason: {:?}",
connect.get_token(),
connect.get_remote_addr(),
connect.get_local_addr(),
e);
}
if let Some(handler) = &self.connect_handler {
let connect_handle = MqttConnectHandle {
gray: AtomicIsize::new(-1),
client_id: context.get_client_id().clone(),
protocol,
connect,
is_closed: AtomicBool::new(true),
};
let event = MqttEvent::Disconnect(connect_handle.get_id(),
connect_handle.protocol.get_broker_name().to_string(),
context.get_client_id().clone(),
reason);
return handler.handle(Arc::new(connect_handle),
Atom::from(""),
Args::OneArgs(event));
}
async move {}.boxed_local()
}
}
impl MqttProxyListener {
pub fn new() -> Self {
MqttProxyListener {
connect_handler: None,
}
}
pub fn with_handler(connect_handler: Option<Arc<dyn Handler<
A = MqttEvent,
B = (),
C = (),
D = (),
E = (),
F = (),
G = (),
H = (),
HandleResult = ()>>>) -> Self {
MqttProxyListener {
connect_handler,
}
}
pub fn set_connect_handler(&mut self,
handler: Option<Arc<dyn Handler<
A = MqttEvent,
B = (),
C = (),
D = (),
E = (),
F = (),
G = (),
H = (),
HandleResult = ()>>>) {
self.connect_handler = handler;
}
}
pub struct MqttProxyService {
request_handler: Option<Arc<dyn Handler<
A = MqttEvent,
B = (),
C = (),
D = (),
E = (),
F = (),
G = (),
H = (),
HandleResult = ()>
>>, }
unsafe impl Send for MqttProxyService {}
impl<S: Socket> MqttBrokerService<S> for MqttProxyService {
fn subscribe(&self,
protocol: MqttBrokerProtocol,
connect: Arc<dyn MqttConnect<S>>,
topics: Vec<(String, u8)>) -> LocalBoxFuture<'static, Result<()>> {
if let Some(handler) = &self.request_handler {
let handler = handler.clone();
async move {
if let Some(mut handle) = connect.get_session() {
if let Some(session) = handle.as_mut() {
let connect_handle = MqttConnectHandle {
gray: AtomicIsize::new(-1),
client_id: session.get_client_id().clone(),
protocol,
connect,
is_closed: AtomicBool::new(false),
};
let event = MqttEvent::Sub(connect_handle.get_id(),
connect_handle.protocol.get_broker_name().to_string(),
session.get_client_id().clone(),
topics);
handler.handle(Arc::new(connect_handle),
Atom::from(""), Args::
OneArgs(event)).await;
}
}
Ok(())
}.boxed_local()
} else {
async move {
Err(Error::new(ErrorKind::Other,
format!("Mqtt proxy subscribe failed, connect: {:?}, reason: handle subscribe error",
connect)))
}.boxed_local()
}
}
fn unsubscribe(&self,
protocol: MqttBrokerProtocol,
connect: Arc<dyn MqttConnect<S>>,
topics: Vec<String>) -> LocalBoxFuture<'static, Result<()>> {
if let Some(mut handle) = connect.get_session() {
if let Some(session) = handle.as_mut() {
if let Some(handler) = &self.request_handler {
let connect_handle = MqttConnectHandle {
gray: AtomicIsize::new(-1),
client_id: session.get_client_id().clone(),
protocol,
connect,
is_closed: AtomicBool::new(false),
};
let event = MqttEvent::Unsub(connect_handle.get_id(),
connect_handle.protocol.get_broker_name().to_string(),
session.get_client_id().clone(),
topics);
let handler = handler.clone();
return async move {
handler.handle(Arc::new(connect_handle),
Atom::from(""),
Args::OneArgs(event)).await;
Ok(())
}.boxed_local();
}
}
}
async move {
Err(Error::new(ErrorKind::Other,
format!("Mqtt proxy request failed, connect: {:?}, reason: handle unsubscribe error",
connect)))
}.boxed_local()
}
fn publish(&self,
protocol: MqttBrokerProtocol,
connect: Arc<dyn MqttConnect<S>>,
topic: String,
payload: Arc<Vec<u8>>) -> LocalBoxFuture<'static, Result<()>> {
if let Some(mut handle) = connect.get_session() {
if let Some(session) = handle.as_mut() {
if let Some(handler) = &self.request_handler {
let connect_handle = MqttConnectHandle {
gray: AtomicIsize::new(-1),
client_id: session.get_client_id().clone(),
protocol,
connect,
is_closed: AtomicBool::new(false),
};
let event = MqttEvent::Publish(connect_handle.get_id(),
connect_handle.protocol.get_broker_name().to_string(),
session.get_client_id().clone(),
connect_handle.get_remote_addr(),
topic,
payload);
let handler = handler.clone();
return async move {
handler.handle(Arc::new(connect_handle),
Atom::from(""),
Args::OneArgs(event)).await;
Ok(())
}.boxed_local();
}
}
}
async move {
Err(Error::new(ErrorKind::Other,
format!("Mqtt proxy publish failed, connect: {:?}, reason: handle publish error",
connect)))
}.boxed_local()
}
}
impl MqttProxyService {
pub fn new() -> Self {
MqttProxyService {
request_handler: None,
}
}
pub fn with_handler(request_handler: Option<Arc<dyn Handler<
A = MqttEvent,
B = (),
C = (),
D = (),
E = (),
F = (),
G = (),
H = (),
HandleResult = ()>>>) -> Self {
MqttProxyService {
request_handler,
}
}
pub fn set_handler(&mut self,
handler: Option<Arc<dyn Handler<
A = MqttEvent,
B = (),
C = (),
D = (),
E = (),
F = (),
G = (),
H = (),
HandleResult = ()>>>) {
self.request_handler = handler;
}
}