tracing_fluentd/
fluent.rs

1//!Fluentd forward protocol definitions.
2use serde::ser::{Serialize, Serializer, SerializeTuple, SerializeMap};
3
4use std::time;
5use core::fmt;
6use std::borrow::Cow;
7use std::collections::HashMap;
8
9#[derive(Clone)]
10#[repr(transparent)]
11///HashMap object suitable for fluent record.
12pub struct Map(HashMap<Cow<'static, str>, Value>);
13
14impl Map {
15    #[inline(always)]
16    ///Creates new empty map.
17    pub fn new() -> Self {
18        Self(HashMap::new())
19    }
20}
21
22impl core::fmt::Debug for Map {
23    #[inline(always)]
24    fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
25        core::fmt::Debug::fmt(&self.0, fmt)
26    }
27}
28
29impl core::ops::Deref for Map {
30    type Target = HashMap<Cow<'static, str>, Value>;
31
32    #[inline(always)]
33    fn deref(&self) -> &Self::Target {
34        &self.0
35    }
36}
37
38impl core::ops::DerefMut for Map {
39    #[inline(always)]
40    fn deref_mut(&mut self) -> &mut Self::Target {
41        &mut self.0
42    }
43}
44
45#[derive(Debug)]
46pub(crate) struct Opts {
47    size: usize,
48}
49
50#[derive(Clone)]
51///Map value type.
52pub enum Value {
53    ///Boolean
54    Bool(bool),
55    ///Integer
56    Int(i64),
57    ///Unsigned integer
58    Uint(u64),
59    ///Float
60    Float(f64),
61    ///String
62    Str(&'static str),
63    ///Owned string
64    String(String),
65    ///Event level
66    EventLevel(tracing_core::Level),
67    ///Object
68    Object(Map),
69}
70
71impl From<bool> for Value {
72    #[inline(always)]
73    fn from(val: bool) -> Self {
74        Self::Bool(val)
75    }
76}
77
78impl From<i64> for Value {
79    #[inline(always)]
80    fn from(val: i64) -> Self {
81        Self::Int(val)
82    }
83}
84
85impl From<u32> for Value {
86    #[inline(always)]
87    fn from(val: u32) -> Self {
88        Self::Uint(val as _)
89    }
90}
91
92impl From<u64> for Value {
93    #[inline(always)]
94    fn from(val: u64) -> Self {
95        Self::Uint(val)
96    }
97}
98
99impl From<f64> for Value {
100    #[inline(always)]
101    fn from(val: f64) -> Self {
102        Self::Float(val)
103    }
104}
105
106impl From<&'static str> for Value {
107    #[inline(always)]
108    fn from(val: &'static str) -> Self {
109        Self::Str(val)
110    }
111}
112
113impl From<String> for Value {
114    #[inline(always)]
115    fn from(val: String) -> Self {
116        Self::String(val)
117    }
118}
119
120impl From<tracing::Level> for Value {
121    #[inline(always)]
122    fn from(val: tracing::Level) -> Self {
123        Self::EventLevel(val)
124    }
125}
126
127impl From<Map> for Value {
128    #[inline(always)]
129    fn from(val: Map) -> Self {
130        Self::Object(val)
131    }
132}
133
134impl fmt::Debug for Value {
135    #[inline(always)]
136    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
137        match self {
138            Value::Bool(val) => fmt::Display::fmt(val, fmt),
139            Value::Int(val) => fmt::Display::fmt(val, fmt),
140            Value::Uint(val) => fmt::Display::fmt(val, fmt),
141            Value::Float(val) => fmt::Display::fmt(val, fmt),
142            Value::EventLevel(val) => fmt::Debug::fmt(val, fmt),
143            Value::Str(val) => fmt::Debug::fmt(val, fmt),
144            Value::String(val) => fmt::Debug::fmt(val, fmt),
145            Value::Object(val) => fmt::Debug::fmt(val, fmt),
146        }
147    }
148}
149
150#[derive(Debug)]
151///Representation of fluent entry within `Message`
152pub struct Record {
153    time: time::Duration,
154    entries: Map,
155}
156
157impl Record {
158    #[inline(always)]
159    ///Creates record with current timestamp
160    pub fn now() -> Self {
161        let time = match time::SystemTime::now().duration_since(time::SystemTime::UNIX_EPOCH) {
162            Ok(time) => time,
163            Err(_) => panic!("SystemTime is before UNIX!?"),
164        };
165
166        Self {
167            time,
168            entries: Map::new(),
169        }
170    }
171
172    #[inline(always)]
173    ///Merges record entries with provided map
174    pub fn update(&mut self, other: &Map) {
175        for (key, value) in other.iter() {
176            if !self.entries.contains_key(key) {
177                self.entries.insert(key.clone(), value.clone());
178            }
179        }
180    }
181}
182
183impl core::ops::Deref for Record {
184    type Target = Map;
185
186    #[inline(always)]
187    fn deref(&self) -> &Self::Target {
188        &self.entries
189    }
190}
191
192impl core::ops::DerefMut for Record {
193    #[inline(always)]
194    fn deref_mut(&mut self) -> &mut Self::Target {
195        &mut self.entries
196    }
197}
198
199#[derive(Debug)]
200///Forward mode message.
201pub struct Message {
202    tag: &'static str,
203    entries: Vec<Record>,
204    opts: Opts,
205    //option
206}
207
208impl Message {
209    #[inline(always)]
210    ///Creates new message with provided tag.
211    pub const fn new(tag: &'static str) -> Self {
212        Self {
213            tag,
214            entries: Vec::new(),
215            opts: Opts {
216                size: 0,
217            }
218        }
219    }
220
221    #[inline(always)]
222    ///Adds record to the message.
223    pub fn add(&mut self, record: Record) {
224        self.entries.push(record);
225        self.opts.size += 1;
226    }
227
228    #[inline(always)]
229    ///Returns number of records inside message.
230    pub fn len(&self) -> usize {
231        self.entries.len()
232    }
233
234    #[inline(always)]
235    ///Clears records from the message
236    pub fn clear(&mut self) {
237        self.entries.clear();
238        self.opts.size = 0;
239    }
240}
241
242fn tracing_level_to_str(level: tracing_core::Level) -> &'static str {
243    if level == tracing_core::Level::ERROR {
244        "ERROR"
245    } else if level == tracing_core::Level::WARN {
246        "WARN"
247    } else if level == tracing_core::Level::INFO {
248        "INFO"
249    } else if level == tracing_core::Level::DEBUG {
250        "DEBUG"
251    } else {
252        "TRACE"
253    }
254}
255
256impl Serialize for Value {
257    #[inline]
258    fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
259        match self {
260            Value::Bool(val) => ser.serialize_bool(*val),
261            Value::Int(val) => ser.serialize_i64(*val),
262            Value::Uint(val) => ser.serialize_u64(*val),
263            Value::Float(val) => ser.serialize_f64(*val),
264            Value::EventLevel(val) => ser.serialize_str(tracing_level_to_str(*val)),
265            Value::Str(val) => ser.serialize_str(val),
266            Value::String(val) => ser.serialize_str(val),
267            Value::Object(val) => {
268                let mut map = ser.serialize_map(Some(val.len()))?;
269                for (key, value) in val.iter() {
270                    map.serialize_entry(key, value)?;
271                }
272                map.end()
273            },
274        }
275    }
276}
277
278impl Serialize for Map {
279    #[inline]
280    fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
281        let mut map = ser.serialize_map(Some(self.0.len()))?;
282        for (key, value) in self.0.iter() {
283            map.serialize_entry(key, value)?;
284        }
285        map.end()
286    }
287}
288
289impl Serialize for Opts {
290    #[inline]
291    fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
292        let mut map = ser.serialize_map(Some(1))?;
293        map.serialize_entry("size", &self.size)?;
294        map.end()
295    }
296}
297
298impl Serialize for Record {
299    #[inline]
300    fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
301        let mut seq = ser.serialize_tuple(2)?;
302
303        let seconds = self.time.as_secs();
304        #[cfg(feature = "event_time")]
305        {
306            struct Int8([u8; 8]);
307
308            impl Serialize for Int8 {
309                #[inline]
310                fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
311                    serializer.serialize_bytes(&self.0)
312                }
313            }
314
315            //rmpv derives extension type of bytes size
316            struct ExtType((i8, Int8));
317
318            impl Serialize for ExtType {
319                #[inline]
320                fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
321                    use rmp_serde::MSGPACK_EXT_STRUCT_NAME;
322
323                    serializer.serialize_newtype_struct(MSGPACK_EXT_STRUCT_NAME, &self.0)
324                }
325            }
326
327            //seq.serialize_element(&self.time.as_secs())?;
328            //
329            //Serialize time as EventTime ext
330            //https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#eventtime-ext-format
331            //This is valid up to year 2106
332            let nanos = self.time.subsec_nanos();
333            let seconds = (seconds as u32).to_be_bytes();
334            let nanos = nanos.to_be_bytes();
335            let time = [seconds[0], seconds[1], seconds[2], seconds[3], nanos[0], nanos[1], nanos[2], nanos[3]];
336            let time = ExtType((0, Int8(time)));
337            seq.serialize_element(&time)?;
338        }
339        #[cfg(not(feature = "event_time"))]
340        {
341            seq.serialize_element(&seconds)?;
342        }
343
344        seq.serialize_element(&self.entries)?;
345        seq.end()
346    }
347}
348
349impl Serialize for Message {
350    #[inline]
351    fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
352        let mut seq = ser.serialize_tuple(3)?;
353        seq.serialize_element(&self.tag)?;
354        seq.serialize_element(&self.entries)?;
355        seq.serialize_element(&self.opts)?;
356        seq.end()
357    }
358}