lading/payload/
fluent.rs

1//! Fluentd payload.
2//!
3//! Implements [this
4//! protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1).
5use std::{collections::HashMap, io::Write};
6
7use rand::{distributions::Standard, prelude::Distribution, Rng};
8use serde_tuple::Serialize_tuple;
9
10use super::{common::AsciiString, Generator};
11use crate::payload::{Error, Serialize};
12
13#[derive(Debug, Default, Clone, Copy)]
14#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
15pub(crate) struct Fluent {}
16
17pub(crate) type Object = HashMap<String, u8>;
18
19#[derive(serde::Serialize)]
20#[serde(untagged)]
21enum RecordValue {
22    String(String),
23    Object(Object),
24}
25
26impl Distribution<RecordValue> for Standard {
27    fn sample<R>(&self, rng: &mut R) -> RecordValue
28    where
29        R: Rng + ?Sized,
30    {
31        match rng.gen_range(0..2) {
32            0 => RecordValue::String(AsciiString::default().generate(rng)),
33            1 => {
34                let mut obj = HashMap::new();
35                for _ in 0..rng.gen_range(0..128) {
36                    let key = AsciiString::default().generate(rng);
37                    let val = rng.gen();
38
39                    obj.insert(key, val);
40                }
41                RecordValue::Object(obj)
42            }
43            _ => unreachable!(),
44        }
45    }
46}
47
48#[derive(Serialize_tuple)]
49struct Entry {
50    time: u32,
51    record: HashMap<String, RecordValue>, // always contains 'message' and 'event' -> object key
52}
53
54impl Distribution<Entry> for Standard {
55    fn sample<R>(&self, rng: &mut R) -> Entry
56    where
57        R: Rng + ?Sized,
58    {
59        let mut rec = HashMap::new();
60        rec.insert(String::from("message"), rng.gen());
61        rec.insert(String::from("event"), rng.gen());
62        for _ in 0..rng.gen_range(0..128) {
63            let key = AsciiString::default().generate(rng);
64            let val = rng.gen();
65
66            rec.insert(key, val);
67        }
68        Entry {
69            time: rng.gen(),
70            record: rec,
71        }
72    }
73}
74
75#[derive(Serialize_tuple)]
76struct FluentForward {
77    tag: String,
78    entries: Vec<Entry>,
79}
80
81impl Distribution<FluentForward> for Standard {
82    fn sample<R>(&self, rng: &mut R) -> FluentForward
83    where
84        R: Rng + ?Sized,
85    {
86        let total_entries = rng.gen_range(0..32);
87        FluentForward {
88            tag: AsciiString::default().generate(rng),
89            entries: rng.sample_iter(Standard).take(total_entries).collect(),
90        }
91    }
92}
93
94#[derive(serde::Serialize)]
95struct FluentMessage {
96    tag: String,
97    time: u32,
98    record: HashMap<String, RecordValue>, // always contains 'message' key
99}
100
101impl Distribution<FluentMessage> for Standard {
102    fn sample<R>(&self, rng: &mut R) -> FluentMessage
103    where
104        R: Rng + ?Sized,
105    {
106        let mut rec = HashMap::new();
107        rec.insert(String::from("message"), rng.gen());
108        for _ in 0..rng.gen_range(0..128) {
109            let key = AsciiString::default().generate(rng);
110            let val = rng.gen();
111
112            rec.insert(key, val);
113        }
114        FluentMessage {
115            tag: AsciiString::default().generate(rng),
116            time: rng.gen(),
117            record: rec,
118        }
119    }
120}
121
122#[derive(serde::Serialize)]
123#[serde(untagged)]
124enum Member {
125    Message(FluentMessage),
126    Forward(FluentForward),
127}
128
129impl Distribution<Member> for Standard {
130    fn sample<R>(&self, rng: &mut R) -> Member
131    where
132        R: Rng + ?Sized,
133    {
134        match rng.gen_range(0..2) {
135            0 => Member::Message(rng.gen()),
136            1 => Member::Forward(rng.gen()),
137            _ => unimplemented!(),
138        }
139    }
140}
141
142impl Serialize for Fluent {
143    fn to_bytes<W, R>(&self, mut rng: R, max_bytes: usize, writer: &mut W) -> Result<(), Error>
144    where
145        W: Write,
146        R: Rng + Sized,
147    {
148        if max_bytes < 16 {
149            // 16 is just an arbitrarily big constant
150            return Ok(());
151        }
152
153        // We will arbitrarily generate 1_000 Member instances and then
154        // serialize. If this is below `max_bytes` we'll add more until we're
155        // over. Once we are we'll start removing instances until we're back
156        // below the limit.
157
158        let mut members: Vec<Vec<u8>> = Standard
159            .sample_iter(&mut rng)
160            .take(10)
161            .map(|m: Member| rmp_serde::to_vec(&m).unwrap())
162            .collect();
163
164        // Search for too many Member instances.
165        loop {
166            let encoding_len = members[0..].iter().fold(0, |acc, m| acc + m.len());
167            if encoding_len > max_bytes {
168                break;
169            }
170
171            members.extend(
172                Standard
173                    .sample_iter(&mut rng)
174                    .take(10)
175                    .map(|m: Member| rmp_serde::to_vec(&m).unwrap()),
176            );
177        }
178
179        // Search for an encoding that's just right.
180        let mut high = members.len();
181        loop {
182            let encoding_len = members[0..high].iter().fold(0, |acc, m| acc + m.len());
183
184            if encoding_len > max_bytes {
185                high /= 2;
186            } else {
187                for m in &members[0..high] {
188                    writer.write_all(m)?;
189                }
190                break;
191            }
192        }
193        Ok(())
194    }
195}
196
197#[cfg(test)]
198mod test {
199    use proptest::prelude::*;
200    use rand::{rngs::SmallRng, SeedableRng};
201
202    use crate::payload::{Fluent, Serialize};
203
204    // We want to be sure that the serialized size of the payload does not
205    // exceed `max_bytes`.
206    proptest! {
207        #[test]
208        fn payload_not_exceed_max_bytes(seed: u64, max_bytes: u16) {
209            let max_bytes = max_bytes as usize;
210            let rng = SmallRng::seed_from_u64(seed);
211            let fluent = Fluent::default();
212
213            let mut bytes = Vec::with_capacity(max_bytes);
214            fluent.to_bytes(rng, max_bytes, &mut bytes).unwrap();
215            debug_assert!(
216                bytes.len() <= max_bytes,
217                "{:?}",
218                std::str::from_utf8(&bytes).unwrap()
219            );
220        }
221    }
222}