1extern crate async_std;
41extern crate derivative;
42extern crate log;
43extern crate log4rs;
44extern crate paho_mqtt;
45
46use std::{io::BufWriter, time::Duration};
47use async_std::task::block_on;
48use derivative::Derivative;
49use log::Record;
50use log4rs::{encode::{EncoderConfig, Encode, pattern::PatternEncoder, self}, append::Append, config::{Deserialize, Deserializers}};
51use paho_mqtt as mqtt;
52
53#[derive(Clone, Eq, PartialEq, Hash, Debug, Default, serde::Deserialize)]
54#[serde(deny_unknown_fields)]
55pub struct MqttAppenderConfig {
57 topic: Option<String>,
58 qos: Option<i32>,
59 encoder: Option<EncoderConfig>,
60 mqtt_server: Option<String>,
61 mqtt_client_id: Option<String>,
62}
63
64#[derive(Derivative)]
65#[derivative(Debug)]
66pub struct MqttAppender {
68 topic: String,
69 qos: i32,
70 encoder: Box<dyn Encode>,
71 #[derivative(Debug="ignore")]
72 mqtt: mqtt::AsyncClient,
73}
74
75impl Append for MqttAppender {
76 fn append(&self, record: &Record) -> anyhow::Result<()> {
81 let mut buffer = StrBuilder { buf: BufWriter::new(Vec::new()) };
82 self.encoder.encode(&mut buffer, record)?;
83 let payload = String::from_utf8_lossy(buffer.buf.buffer()).to_string();
84 let message = mqtt::MessageBuilder::new()
85 .topic(self.topic.as_str())
86 .qos(self.qos)
87 .payload(payload.strip_suffix('\n').unwrap())
88 .finalize();
89 block_on(self.mqtt.publish(message))?;
90 Ok(())
91 }
92
93 fn flush(&self) {}
95}
96
97impl MqttAppender {
98 pub fn builder() -> MqttAppenderBuilder {
100 MqttAppenderBuilder {
101 topic: None,
102 qos: None,
103 encoder: None,
104 mqtt_server: None,
105 mqtt_client_id: None,
106 }
107 }
108}
109
110pub struct MqttAppenderBuilder {
112 topic: Option<String>,
113 qos: Option<i32>,
114 encoder: Option<Box<dyn Encode>>,
115 mqtt_server: Option<String>,
116 mqtt_client_id: Option<String>,
117}
118
119impl MqttAppenderBuilder {
120 pub fn encoder(mut self, encoder: Box<dyn Encode>) -> MqttAppenderBuilder {
122 self.encoder = Some(encoder);
123 self
124 }
125
126 pub fn topic(mut self, topic: &str) -> MqttAppenderBuilder {
129 self.topic = Some(topic.to_string());
130 self
131 }
132
133 pub fn qos(mut self, qos: i32) -> MqttAppenderBuilder {
136 self.qos = Some(qos);
137 self
138 }
139
140 pub fn mqtt_server(mut self, host: &str) -> MqttAppenderBuilder {
143 self.mqtt_server = Some(host.to_string());
144 self
145 }
146
147 pub fn mqtt_client_id(mut self, client_id: &str) -> MqttAppenderBuilder {
150 self.mqtt_client_id = Some(client_id.to_string());
151 self
152 }
153
154 pub fn build(self) -> MqttAppender {
156 let mut copts = mqtt::CreateOptionsBuilder::new()
157 .server_uri(self.mqtt_server.unwrap_or_else(|| "mqtt://localhost:1883".to_string()));
158 if let Some(client_id) = self.mqtt_client_id {
159 copts = copts.client_id(client_id);
160 }
161 let mqtt_client = mqtt::AsyncClient::new(copts.finalize()).expect("Unable to create MQTT client");
162 let opts = mqtt::ConnectOptionsBuilder::new()
163 .connect_timeout(Duration::from_secs(5))
164 .automatic_reconnect(Duration::from_secs(5), Duration::from_secs(300))
165 .finalize();
166 block_on(mqtt_client.connect(opts)).unwrap();
167
168 MqttAppender {
169 topic: self.topic.unwrap_or_else(|| "logging".to_string()),
170 qos: self.qos.unwrap_or(0),
171 encoder: self.encoder.unwrap_or_else(|| Box::<PatternEncoder>::default()),
172 mqtt: mqtt_client,
173 }
174 }
175}
176
177#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Default)]
201pub struct MqttAppenderDeserializer;
202
203impl Deserialize for MqttAppenderDeserializer {
204 type Trait = dyn Append;
205
206 type Config = MqttAppenderConfig;
207
208 fn deserialize(&self, config: MqttAppenderConfig, deserializers: &Deserializers) -> anyhow::Result<Box<Self::Trait>> {
209 let mut appender = MqttAppender::builder();
210 if let Some(topic) = config.topic {
211 appender = appender.topic(topic.as_str());
212 }
213 if let Some(qos) = config.qos {
214 appender = appender.qos(qos);
215 }
216 if let Some(encoder) = config.encoder {
217 appender = appender.encoder(deserializers.deserialize(&encoder.kind, encoder.config)?);
218 }
219 if let Some(mqtt_server) = config.mqtt_server {
220 appender = appender.mqtt_server(mqtt_server.as_str());
221 }
222 if let Some(mqtt_client_id) = config.mqtt_client_id {
223 appender = appender.mqtt_client_id(mqtt_client_id.as_str());
224 }
225 Ok(Box::new(appender.build()))
226 }
227}
228
229pub fn register(deserializers: &mut log4rs::config::Deserializers) {
231 deserializers.insert("mqtt", MqttAppenderDeserializer);
232 deserializers.insert("mqtt", MqttAppenderDeserializer);
233}
234
235struct StrBuilder { buf: BufWriter<Vec<u8>> }
238
239impl std::io::Write for StrBuilder {
240 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
241 self.buf.write(buf)
242 }
243
244 fn flush(&mut self) -> std::io::Result<()> {
245 self.buf.flush()
246 }
247}
248impl encode::Write for StrBuilder {}