pub use mqttc as client;
use self::client::{PubSub, PubOpt};
use super::{Listener, Instruments};
use super::ser::{InstantiateSerializer, IntoWriter};
use serde::Serializer;
use std::sync::mpsc;
enum Message {
Update(&'static str),
Shutdown,
}
pub trait TopicFormatter {
fn format_topic(&self, name: &'static str) -> String;
}
impl TopicFormatter for () {
fn format_topic(&self, name: &'static str) -> String {
name.into()
}
}
pub struct Publisher<TF: TopicFormatter, I: Instruments<Handle>> {
topic_formatter: TF,
client: client::Client,
instruments: I,
retain: bool,
sender: mpsc::Sender<Message>,
receiver: mpsc::Receiver<Message>,
}
impl<TF: TopicFormatter, I: Instruments<Handle>> Publisher<TF, I> {
pub fn new(topic_formatter: TF, client: client::Client, mut instruments: I, retain: bool) -> Self {
let (sender, receiver) = mpsc::channel();
let handle = Handle { sender: sender.clone() };
instruments.wire_listener(handle);
Publisher {
topic_formatter,
client,
instruments,
retain,
sender,
receiver,
}
}
pub fn instruments(&self) -> &I {
&self.instruments
}
pub fn handle(&self) -> Handle {
Handle { sender: self.sender.clone() }
}
pub fn run<IS, S>(&mut self, is: IS)
where for<'a> IS: InstantiateSerializer<'a, Vec<u8>, Target=S>,
S: IntoWriter<Vec<u8>>, for<'a> &'a mut S: Serializer {
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
let mut last_messages = HashMap::new();
let pubopt = if self.retain {
PubOpt::retain()
} else {
PubOpt::at_least_once()
};
loop {
match self.receiver.recv() {
Ok(Message::Shutdown) => break,
Ok(Message::Update(name)) => {
let mut ser = is.instantiate_serializer(Vec::with_capacity(64));
let _ = self.instruments.serialize_reading(name, &mut ser).unwrap();
let vec : Vec<u8> = ser.into_writer();
let mut hasher = DefaultHasher::new();
vec.hash(&mut hasher);
let hash = hasher.finish();
if match last_messages.entry(name) {
Entry::Vacant(entry) => {
entry.insert(hash);
true
},
Entry::Occupied(mut entry) => {
if *entry.get() != hash {
entry.insert(hash);
true
} else {
false
}
}
} {
let _ = self.client.publish(self.topic_formatter.format_topic(name), vec, pubopt).unwrap();
}
},
Err(err) => panic!(err),
}
}
}
pub fn into_inner(self) -> client::Client {
self.client
}
}
#[derive(Clone)]
pub struct Handle {
sender: mpsc::Sender<Message>,
}
impl Handle {
pub fn shutdown(&self) {
let _ = self.sender.send(Message::Shutdown).unwrap();
}
}
impl Listener for Handle {
fn instrument_updated(&self, name: &'static str) {
let _ = self.sender.send(Message::Update(name)).unwrap();
}
}