log4rs_mqtt/
lib.rs

1//! MQTT Appender for log4rs.
2//! 
3//! This will format log messages, remove trailing newlines, and publish the message to an MQTT topic.
4//! You can programatically create one or use a log4rs.yml definition. In order to use the YAML
5//! you'll need to register the `MqttAppenderDeserializer` with `register`.
6//! 
7//! # Examples
8//! 
9//! ## Create an MQTT logger programatically
10//! ```
11//! let mqtt_log = MqttAppender::builder()
12//!     .topic("logs")
13//!     .client_id("log_client")
14//!     .build();
15//! let log_config = Config::builder()
16//!     .appender(Appender::builder().build("mqtt", Box::new(mqtt_log)))
17//!     .build(Root::builder().appender("mqtt").build(LevelFilter::Info))
18//!     .unwrap();
19//! log4rs::init_config(log_config).unwrap();
20//! ```
21//! 
22//! ## Create an MQTT logger with YAML
23//! ```yaml
24//! appenders:
25//!   mqtt:
26//!     kind: mqtt
27//!     mqtt_server: mqtt://mosquitto.local:1883
28//!     mqtt_client_id: app_logger
29//! root:
30//!   level: info
31//!   appenders:
32//!     - mqtt
33//! ```
34//! 
35//! # Warning
36//! Ensure that `paho_mqtt_c` and `paho_mqtt` log targets do not log with this appender, especially when logging DEBUG.
37//! You'll get a recursive call to the MQTT manager setup and the RwLock will level release. If you see sudden
38//! application hangs, try decreasing the level or removing the MQTT logger and see if that fixes the problem.
39
40extern crate async_std;
41extern crate derivative;
42extern crate log;
43extern crate log4rs;
44extern crate paho_mqtt;
45
46use std::{io::BufWriter, time::Duration};
47use async_std::task::block_on;
48use derivative::Derivative;
49use log::Record;
50use log4rs::{encode::{EncoderConfig, Encode, pattern::PatternEncoder, self}, append::Append, config::{Deserialize, Deserializers}};
51use paho_mqtt as mqtt;
52
53#[derive(Clone, Eq, PartialEq, Hash, Debug, Default, serde::Deserialize)]
54#[serde(deny_unknown_fields)]
55/// Configuration structure for the MQTT appender
56pub struct MqttAppenderConfig {
57    topic: Option<String>,
58    qos: Option<i32>,
59    encoder: Option<EncoderConfig>,
60    mqtt_server: Option<String>,
61    mqtt_client_id: Option<String>,
62}
63
64#[derive(Derivative)]
65#[derivative(Debug)]
66/// Main MQTT appender structure
67pub struct MqttAppender {
68    topic: String,
69    qos: i32,
70    encoder: Box<dyn Encode>,
71    #[derivative(Debug="ignore")]
72    mqtt: mqtt::AsyncClient,
73}
74
75impl Append for MqttAppender {
76    /// Append to the MQTT stream.
77    /// 
78    /// This encodes the [`Record`] in a string buffer,
79    /// strips the trailing newline, then sends it to the MQTT topic.
80    fn append(&self, record: &Record) -> anyhow::Result<()> {
81        let mut buffer = StrBuilder { buf: BufWriter::new(Vec::new()) };
82        self.encoder.encode(&mut buffer, record)?;
83        let payload = String::from_utf8_lossy(buffer.buf.buffer()).to_string();
84        let message = mqtt::MessageBuilder::new()
85            .topic(self.topic.as_str())
86            .qos(self.qos)
87            .payload(payload.strip_suffix('\n').unwrap())
88            .finalize();
89        block_on(self.mqtt.publish(message))?;
90        Ok(())
91    }
92
93    /// Do nothing
94    fn flush(&self) {}
95}
96
97impl MqttAppender {
98    /// Create a new builder for MqttAppender.
99    pub fn builder() -> MqttAppenderBuilder {
100        MqttAppenderBuilder {
101            topic: None,
102            qos: None,
103            encoder: None,
104            mqtt_server: None,
105            mqtt_client_id: None,
106        }
107    }
108}
109
110/// Configuration builder.
111pub struct MqttAppenderBuilder {
112    topic: Option<String>,
113    qos: Option<i32>,
114    encoder: Option<Box<dyn Encode>>,
115    mqtt_server: Option<String>,
116    mqtt_client_id: Option<String>,
117}
118
119impl MqttAppenderBuilder {
120    /// Sets the output encoder for the `MqttAppender`.
121    pub fn encoder(mut self, encoder: Box<dyn Encode>) -> MqttAppenderBuilder {
122        self.encoder = Some(encoder);
123        self
124    }
125
126    /// Sets the MQTT topic to send logs to.
127    /// Defaults to "logging"
128    pub fn topic(mut self, topic: &str) -> MqttAppenderBuilder {
129        self.topic = Some(topic.to_string());
130        self
131    }
132
133    /// Sets the MQTT QOS to use when sending logs.
134    /// Defaults to 0.
135    pub fn qos(mut self, qos: i32) -> MqttAppenderBuilder {
136        self.qos = Some(qos);
137        self
138    }
139
140    /// Sets the MQTT server URI.
141    /// Defaults to mqtt://localhost:1883
142    pub fn mqtt_server(mut self, host: &str) -> MqttAppenderBuilder {
143        self.mqtt_server = Some(host.to_string());
144        self
145    }
146
147    /// Sets the MQTT client name.
148    /// Defaults to a randomly generated name.
149    pub fn mqtt_client_id(mut self, client_id: &str) -> MqttAppenderBuilder {
150        self.mqtt_client_id = Some(client_id.to_string());
151        self
152    }
153
154    /// Consumes the `MqttAppenderBuilder`, producing an `MqttAppender`.
155    pub fn build(self) -> MqttAppender {
156        let mut copts = mqtt::CreateOptionsBuilder::new()
157            .server_uri(self.mqtt_server.unwrap_or_else(|| "mqtt://localhost:1883".to_string()));
158        if let Some(client_id) = self.mqtt_client_id {
159            copts = copts.client_id(client_id);
160        }
161        let mqtt_client = mqtt::AsyncClient::new(copts.finalize()).expect("Unable to create MQTT client");
162        let opts = mqtt::ConnectOptionsBuilder::new()
163            .connect_timeout(Duration::from_secs(5))
164            .automatic_reconnect(Duration::from_secs(5), Duration::from_secs(300))
165            .finalize();
166        block_on(mqtt_client.connect(opts)).unwrap();
167
168        MqttAppender {
169            topic: self.topic.unwrap_or_else(|| "logging".to_string()),
170            qos: self.qos.unwrap_or(0),
171            encoder: self.encoder.unwrap_or_else(|| Box::<PatternEncoder>::default()),
172            mqtt: mqtt_client,
173        }
174    }
175}
176
177/// A deserializer for the `MqttAppender`.
178/// 
179/// # Configuration
180/// 
181/// ```yaml
182/// kind: mqtt
183/// 
184/// # The topic used to publish logs. Defaults to `logging`
185/// topic: log_messages
186/// 
187/// # The QOS value to use for MQTT publishing. Must be a valid QOS (0, 1, 2) and defaults to 0.
188/// qos: 1
189/// 
190/// # The encoder to use to format output. Defaults to `kind: pattern`.
191/// encoder:
192///   kind: pattern
193/// 
194/// # The MQTT server URI. If not specified, defaults to mqtt://localhost:1883
195/// mqtt_server: mqtt://localhost:1883
196/// 
197/// # The MQTT client ID. If not speficied, use the paho default.
198/// mqtt_client_id: app_logger
199/// ```
200#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Default)]
201pub struct MqttAppenderDeserializer;
202
203impl Deserialize for MqttAppenderDeserializer {
204    type Trait = dyn Append;
205
206    type Config = MqttAppenderConfig;
207
208    fn deserialize(&self, config: MqttAppenderConfig, deserializers: &Deserializers) -> anyhow::Result<Box<Self::Trait>> {
209        let mut appender = MqttAppender::builder();
210        if let Some(topic) = config.topic {
211            appender = appender.topic(topic.as_str());
212        }
213        if let Some(qos) = config.qos {
214            appender = appender.qos(qos);
215        }
216        if let Some(encoder) = config.encoder {
217            appender = appender.encoder(deserializers.deserialize(&encoder.kind, encoder.config)?);
218        }
219        if let Some(mqtt_server) = config.mqtt_server {
220            appender = appender.mqtt_server(mqtt_server.as_str());
221        }
222        if let Some(mqtt_client_id) = config.mqtt_client_id {
223            appender = appender.mqtt_client_id(mqtt_client_id.as_str());
224        }
225        Ok(Box::new(appender.build()))
226    }
227}
228
229/// Register deserializer for creating MQTT appender based on log4rs configuration file.
230pub fn register(deserializers: &mut log4rs::config::Deserializers) {
231    deserializers.insert("mqtt", MqttAppenderDeserializer);
232    deserializers.insert("mqtt", MqttAppenderDeserializer);
233}
234
235/// We need to do this to avoid E0117 since BufWriter and encode::Write are both external
236/// Le sigh.
237struct StrBuilder { buf: BufWriter<Vec<u8>> }
238
239impl std::io::Write for StrBuilder {
240    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
241        self.buf.write(buf)
242    }
243
244    fn flush(&mut self) -> std::io::Result<()> {
245        self.buf.flush()
246    }
247}
248impl encode::Write for StrBuilder {}