use crate::{client::ConnId,
mqtt::{Packet, QoS, QosPid, SubscribeReturnCodes}};
use futures::prelude::*;
use log::*;
use serde::{Deserialize, Serialize};
use serde_json::{from_slice, to_string, Value};
use std::{collections::HashMap,
fs::OpenOptions,
io::{Error, Write},
process::{Command, Stdio},
sync::{Arc, Mutex}};
use tokio::sync::mpsc::{channel, Sender};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpMeta<'a> {
pub ts: String,
pub con: ConnId,
pub id: &'a str,
pub from: &'a str,
pub pkt: DumpMqtt,
}
pub type DumpPid = u16;
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum DumpQos {
AtMostOnce,
AtLeastOnce,
ExactlyOnce,
}
impl From<QoS> for DumpQos {
fn from(q: QoS) -> Self {
match q {
QoS::AtMostOnce => Self::AtMostOnce,
QoS::AtLeastOnce => Self::AtLeastOnce,
QoS::ExactlyOnce => Self::ExactlyOnce,
}
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum DumpQosId {
AtMostOnce,
AtLeastOnce(DumpPid),
ExactlyOnce(DumpPid),
}
impl DumpQosId {
fn from(qp: QosPid) -> Self {
match qp {
QosPid::AtMostOnce => Self::AtMostOnce,
QosPid::AtLeastOnce(i) => Self::AtLeastOnce(i.get()),
QosPid::ExactlyOnce(i) => Self::ExactlyOnce(i.get()),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpConnack {
pub session: bool,
pub code: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpPublish {
pub dup: bool,
pub qos: DumpQosId,
pub topic: String,
pub pl: DumpPayload,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpPayload {
pub len: usize,
pub raw: Vec<u8>,
pub utf8: Option<String>,
pub json: Option<Value>,
pub err: Option<String>,
}
impl DumpPayload {
fn new(raw: Vec<u8>, decoder: &Option<String>) -> Self {
let len = raw.len();
let dec = match decoder {
None => Ok(raw.clone()),
Some(d) => spawn_cmd(&raw, d),
};
match dec {
Ok(d) => {
let utf8 = String::from_utf8(d.clone()).ok();
let json = from_slice(&d).ok();
Self { len, raw, utf8, json, err: None }
},
Err(e) => Self { len, raw, utf8: None, json: None, err: Some(e) },
}
}
}
fn spawn_cmd(raw: &Vec<u8>, cmd: &String) -> Result<Vec<u8>, String> {
let mut child = Command::new(cmd).stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| format!("Couldn't start {}: {:?}", cmd, e))?;
child.stdin
.take()
.unwrap()
.write_all(raw)
.map_err(|e| format!("Couldn't write to {}'s stdin: {:?}", cmd, e))?;
match child.wait_with_output() {
Ok(out) if out.status.success() && out.stderr.is_empty() => Ok(out.stdout),
Ok(out) if !out.stderr.is_empty() => Err(String::from_utf8_lossy(&out.stderr).into_owned()),
e => Err(format!("unexpected return from {}: {:?}", cmd, e)),
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpSubscribe {
pub pid: DumpPid,
pub topics: Vec<DumpSubscribeTopic>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpUnsubscribe {
pub pid: DumpPid,
pub topics: Vec<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpSubscribeTopic {
pub topic: String,
pub qos: DumpQos,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DumpSuback {
pub pid: DumpPid,
pub codes: Vec<DumpSubackcode>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DumpSubackcode {
AtMostOnce,
AtLeastOnce,
ExactlyOnce,
Failure,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DumpMqtt {
Connect(String),
Connack(DumpConnack),
Publish(DumpPublish),
Puback(DumpPid),
Pubrec(DumpPid),
Pubrel(DumpPid),
Pubcomp(DumpPid),
Subscribe(DumpSubscribe),
Suback(DumpSuback),
Unsubscribe(DumpUnsubscribe),
Unsuback(DumpPid),
Pingreq,
Pingresp,
Disconnect,
}
impl DumpMqtt {
pub fn kind(&self) -> &'static str {
match self {
Self::Connect(_) => "con",
Self::Connack(_) => "conack",
Self::Publish(_) => "pub",
Self::Puback(_) => "puback",
Self::Pubrec(_) => "pubrec",
Self::Pubrel(_) => "pubrel",
Self::Pubcomp(_) => "pubcomp",
Self::Subscribe(_) => "sub",
Self::Suback(_) => "suback",
Self::Unsubscribe(_) => "unsub",
Self::Unsuback(_) => "unsuback",
Self::Pingreq => "pingreq",
Self::Pingresp => "pingresp",
Self::Disconnect => "disco",
}
}
fn new(p: &Packet, decode_cmd: &Option<String>) -> Self {
match p {
Packet::Connect(p) => Self::Connect(p.client_id.clone()),
Packet::Connack(p) => Self::Connack(DumpConnack { session: p.session_present,
code: format!("{:?}", p.code) }),
Packet::Publish(p) => {
Self::Publish(DumpPublish { dup: p.dup,
qos: DumpQosId::from(p.qospid),
topic: p.topic_name.clone(),
pl: DumpPayload::new(p.payload.clone(), &decode_cmd) })
},
Packet::Puback(p) => Self::Puback(p.get()),
Packet::Pubrec(p) => Self::Pubrec(p.get()),
Packet::Pubrel(p) => Self::Pubrel(p.get()),
Packet::Pubcomp(p) => Self::Pubcomp(p.get()),
Packet::Subscribe(p) => {
let topics =
p.topics
.iter()
.map(|s| DumpSubscribeTopic { topic: s.topic_path.clone(), qos: s.qos.into() })
.collect();
Self::Subscribe(DumpSubscribe { pid: p.pid.get(), topics })
},
Packet::Suback(p) => {
let codes = p.return_codes
.iter()
.map(|c| match c {
SubscribeReturnCodes::Success(QoS::AtMostOnce) => {
DumpSubackcode::AtMostOnce
},
SubscribeReturnCodes::Success(QoS::AtLeastOnce) => {
DumpSubackcode::AtLeastOnce
},
SubscribeReturnCodes::Success(QoS::ExactlyOnce) => {
DumpSubackcode::ExactlyOnce
},
SubscribeReturnCodes::Failure => DumpSubackcode::Failure,
})
.collect();
Self::Suback(DumpSuback { pid: p.pid.get(), codes })
},
Packet::Unsubscribe(p) => {
Self::Unsubscribe(DumpUnsubscribe { pid: p.pid.get(), topics: p.topics.clone() })
},
Packet::Unsuback(p) => Self::Unsuback(p.get()),
Packet::Pingreq => Self::Pingreq,
Packet::Pingresp => Self::Pingresp,
Packet::Disconnect => Self::Disconnect,
}
}
}
#[derive(Clone)]
pub(crate) struct Dump {
reg: Arc<Mutex<HashMap<String, Sender<String>>>>,
chans: Vec<Sender<String>>,
decode_cmd: Option<String>,
prefix: String,
}
impl Dump {
pub fn new(decode_cmd: &Option<String>, prefix: &str) -> Self {
Dump { reg: Arc::new(Mutex::new(HashMap::new())),
chans: vec![],
decode_cmd: decode_cmd.clone(),
prefix: prefix.to_owned() }
}
pub fn register(&mut self, name: &str) -> Result<(), Error> {
let name = format!("{}{}", self.prefix, name);
let mut reg = self.reg.lock().expect("Aquire Dump.reg");
let s = match reg.get(&name) {
None => {
debug!("Opening dump file {}", name);
let mut f = OpenOptions::new().append(true).create(true).open(&name)?;
let (sx, mut rx) = channel::<String>(10);
reg.insert(name.clone(), sx.clone());
tokio::spawn(async move {
while let Some(s) = rx.next().await {
if let Err(e) = f.write_all(s.as_bytes()) {
error!("Writing to {}: {:?}", name, e);
}
}
});
sx
},
Some(s) => s.clone(),
};
self.chans.push(s);
Ok(())
}
fn now_str() -> String {
let t = time::OffsetDateTime::now();
format!("{}.{:06.06}Z", t.format("%FT%T"), t.microsecond())
}
pub async fn dump<'s>(&'s self, con: ConnId, id: &str, from: &str, pkt: &Packet) {
let ts = Dump::now_str();
let pkt = DumpMqtt::new(pkt, &self.decode_cmd);
let e = to_string(&DumpMeta { ts, con, id, from, pkt }).unwrap();
for c in self.chans.iter() {
c.clone().send(e.clone()).await.expect("Cannot send to chan");
}
}
}