vrl 0.32.0

Vector Remap Language
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
use crate::compiler::prelude::*;
use std::collections::BTreeMap;

fn parse_aws_vpc_flow_log(value: Value, format: Option<Value>) -> Resolved {
    let bytes = value.try_bytes()?;
    let input = String::from_utf8_lossy(&bytes);
    if let Some(expr) = format {
        let bytes = expr.try_bytes()?;
        parse_log(&input, Some(&String::from_utf8_lossy(&bytes)))
    } else {
        parse_log(&input, None)
    }
    .map_err(Into::into)
}

#[derive(Clone, Copy, Debug)]
pub struct ParseAwsVpcFlowLog;

impl Function for ParseAwsVpcFlowLog {
    fn identifier(&self) -> &'static str {
        "parse_aws_vpc_flow_log"
    }

    fn usage(&self) -> &'static str {
        "Parses `value` in the [VPC Flow Logs format](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html)."
    }

    fn category(&self) -> &'static str {
        Category::Parse.as_ref()
    }

    fn internal_failure_reasons(&self) -> &'static [&'static str] {
        &["`value` is not a properly formatted AWS VPC Flow log."]
    }

    fn return_kind(&self) -> u16 {
        kind::OBJECT
    }

    fn examples(&self) -> &'static [Example] {
        &[
            example! {
                title: "Parse AWS VPC Flow log (default format)",
                source: r#"parse_aws_vpc_flow_log!("2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA")"#,
                result: Ok(indoc! { r#"{
                    "version": 2,
                    "account_id": "123456789010",
                    "interface_id": "eni-1235b8ca123456789",
                    "srcaddr": null,
                    "dstaddr": null,
                    "srcport": null,
                    "dstport": null,
                    "protocol": null,
                    "packets": null,
                    "bytes": null,
                    "start": 1431280876,
                    "end": 1431280934,
                    "action": null,
                    "log_status":"NODATA"
                }"# }),
            },
            example! {
                title: "Parse AWS VPC Flow log (custom format)",
                source: indoc! {r#"
                    parse_aws_vpc_flow_log!(
                        "- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5",
                        "instance_id interface_id srcaddr dstaddr pkt_srcaddr pkt_dstaddr"
                    )
                "#},
                result: Ok(indoc! { r#"{
                    "instance_id": null,
                    "interface_id": "eni-1235b8ca123456789",
                    "srcaddr": "10.0.1.5",
                    "dstaddr": "10.0.0.220",
                    "pkt_srcaddr": "10.0.1.5",
                    "pkt_dstaddr": "203.0.113.5"
                }"# }),
            },
            example! {
                title: "Parse AWS VPC Flow log including v5 fields",
                source: indoc! {r#"
                    parse_aws_vpc_flow_log!(
                        "5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK",
                        format: "version srcaddr dstaddr srcport dstport protocol start end type packets bytes account_id vpc_id subnet_id instance_id interface_id region az_id sublocation_type sublocation_id action tcp_flags pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path flow_direction log_status"
                    )
                "#},
                result: Ok(indoc! { r#"{
                    "account_id": "123456789012",
                    "action": "ACCEPT",
                    "az_id": "apse2-az3",
                    "bytes": 15044,
                    "dstaddr": "10.0.0.71",
                    "dstport": 34210,
                    "end": 1616729349,
                    "flow_direction": "ingress",
                    "instance_id": "i-0c50d5961bcb2d47b",
                    "interface_id": "eni-1235b8ca123456789",
                    "log_status": "OK",
                    "packets": 14,
                    "pkt_dst_aws_service": null,
                    "pkt_dstaddr": "10.0.0.71",
                    "pkt_src_aws_service": "S3",
                    "pkt_srcaddr": "52.95.128.179",
                    "protocol": 6,
                    "region": "ap-southeast-2",
                    "srcaddr": "52.95.128.179",
                    "srcport": 80,
                    "start": 1616729292,
                    "sublocation_id": null,
                    "sublocation_type": null,
                    "subnet_id": "subnet-aaaaaaaa012345678",
                    "tcp_flags": 19,
                    "traffic_path": null,
                    "type": "IPv4",
                    "version": 5,
                    "vpc_id": "vpc-abcdefab012345678"
                }"# }),
            },
        ]
    }

    fn compile(
        &self,
        _state: &state::TypeState,
        _ctx: &mut FunctionCompileContext,
        arguments: ArgumentList,
    ) -> Compiled {
        let value = arguments.required("value");
        let format = arguments.optional("format");

        Ok(ParseAwsVpcFlowLogFn::new(value, format).as_expr())
    }

    fn parameters(&self) -> &'static [Parameter] {
        const PARAMETERS: &[Parameter] = &[
            Parameter::required("value", kind::BYTES, "VPC Flow Log."),
            Parameter::optional("format", kind::BYTES, "VPC Flow Log format."),
        ];
        PARAMETERS
    }
}

#[derive(Debug, Clone)]
struct ParseAwsVpcFlowLogFn {
    value: Box<dyn Expression>,
    format: Option<Box<dyn Expression>>,
}

impl ParseAwsVpcFlowLogFn {
    fn new(value: Box<dyn Expression>, format: Option<Box<dyn Expression>>) -> Self {
        Self { value, format }
    }
}

impl FunctionExpression for ParseAwsVpcFlowLogFn {
    fn resolve(&self, ctx: &mut Context) -> Resolved {
        let value = self.value.resolve(ctx)?;
        let format = self
            .format
            .as_ref()
            .map(|expr| expr.resolve(ctx))
            .transpose()?;

        parse_aws_vpc_flow_log(value, format)
    }

    fn type_def(&self, _: &state::TypeState) -> TypeDef {
        TypeDef::object(inner_kind()).fallible(/* log parsing error */)
    }
}

fn inner_kind() -> BTreeMap<Field, Kind> {
    BTreeMap::from([
        (Field::from("account_id"), Kind::bytes() | Kind::null()),
        (Field::from("action"), Kind::bytes() | Kind::null()),
        (Field::from("az_id"), Kind::bytes() | Kind::null()),
        (Field::from("bytes"), Kind::integer() | Kind::null()),
        (Field::from("dstaddr"), Kind::bytes() | Kind::null()),
        (Field::from("dstport"), Kind::integer() | Kind::null()),
        (Field::from("end"), Kind::integer() | Kind::null()),
        (Field::from("flow_direction"), Kind::bytes() | Kind::null()),
        (Field::from("instance_id"), Kind::bytes() | Kind::null()),
        (Field::from("interface_id"), Kind::bytes() | Kind::null()),
        (Field::from("log_status"), Kind::bytes() | Kind::null()),
        (Field::from("packets"), Kind::integer() | Kind::null()),
        (Field::from("pkt_dstaddr"), Kind::bytes() | Kind::null()),
        (
            Field::from("pkt_dst_aws_service"),
            Kind::bytes() | Kind::null(),
        ),
        (Field::from("pkt_srcaddr"), Kind::bytes() | Kind::null()),
        (
            Field::from("pkt_src_aws_service"),
            Kind::bytes() | Kind::null(),
        ),
        (Field::from("protocol"), Kind::integer() | Kind::null()),
        (Field::from("region"), Kind::bytes() | Kind::null()),
        (Field::from("srcaddr"), Kind::bytes() | Kind::null()),
        (Field::from("srcport"), Kind::integer() | Kind::null()),
        (Field::from("start"), Kind::integer() | Kind::null()),
        (Field::from("sublocation_id"), Kind::bytes() | Kind::null()),
        (
            Field::from("sublocation_type"),
            Kind::bytes() | Kind::null(),
        ),
        (Field::from("subnet_id"), Kind::bytes() | Kind::null()),
        (Field::from("tcp_flags"), Kind::integer() | Kind::null()),
        (Field::from("traffic_path"), Kind::integer() | Kind::null()),
        (Field::from("type"), Kind::bytes() | Kind::null()),
        (Field::from("version"), Kind::integer() | Kind::null()),
        (Field::from("vpc_id"), Kind::bytes() | Kind::null()),
    ])
}

type ParseResult<T> = std::result::Result<T, String>;

#[allow(clippy::unnecessary_wraps)] // match other parse methods
fn identity<'a>(_key: &'a str, value: &'a str) -> ParseResult<&'a str> {
    Ok(value)
}

fn parse_i64(key: &str, value: &str) -> ParseResult<i64> {
    value
        .parse()
        .map_err(|_| format!("failed to parse value as i64 (key: `{key}`): `{value}`"))
}

macro_rules! create_match {
    ($log:expr_2021, $key:expr_2021, $value:expr_2021, $($name:expr_2021 => $transform:expr_2021),+) => {
        match $key {
            $($name => {
                let value = match $value {
                    "-" => Value::Null,
                    value => $transform($name, value)?.into(),
                };
                if $log.insert($name.into(), value).is_some() {
                    return Err(format!("value already exists for key: `{}`", $key));
                }
            })+
            key => return Err(format!("unknown key: `{}`", key))
        };
    };
}

fn parse_log(input: &str, format: Option<&str>) -> ParseResult<Value> {
    let mut log = BTreeMap::new();

    let mut input = input.split(' ');
    let mut format = format
        .unwrap_or("version account_id interface_id srcaddr dstaddr srcport dstport protocol packets bytes start end action log_status")
        .split(' ');

    loop {
        return match (format.next(), input.next()) {
            (Some(key), Some(value)) => {
                create_match!(
                    log, key, value,
                    "account_id" => identity,
                    "action" => identity,
                    "az_id" => identity,
                    "bytes" => parse_i64,
                    "dstaddr" => identity,
                    "dstport" => parse_i64,
                    "end" => parse_i64,
                    "flow_direction" => identity,
                    "instance_id" => identity,
                    "interface_id" => identity,
                    "log_status" => identity,
                    "packets" => parse_i64,
                    "pkt_dstaddr" => identity,
                    "pkt_dst_aws_service" => identity,
                    "pkt_srcaddr" => identity,
                    "pkt_src_aws_service" => identity,
                    "protocol" => parse_i64,
                    "region" => identity,
                    "srcaddr" => identity,
                    "srcport" => parse_i64,
                    "start" => parse_i64,
                    "sublocation_id" => identity,
                    "sublocation_type" => identity,
                    "subnet_id" => identity,
                    "tcp_flags" => parse_i64,
                    "traffic_path" => parse_i64,
                    "type" => identity,
                    "version" => parse_i64,
                    "vpc_id" => identity
                );

                continue;
            }
            (None, Some(value)) => Err(format!("no key for value: `{value}`")),
            (Some(key), None) => Err(format!("no item for key: `{key}`")),
            (None, None) => Ok(log.into()),
        };
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::value;

    #[test]
    fn parse_aws_vpc_flow_log() {
        // Examples from https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html
        let logs = vec![
            (
                None,
                vec![
                    "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK",
                    "2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK",
                    "2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA",
                    "2 123456789010 eni-11111111aaaaaaaaa - - - - - - - 1431280876 1431280934 - SKIPDATA",
                    "2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK",
                    "2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK",
                    "2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK",
                ],
            ),
            (
                Some(
                    "version vpc_id subnet_id instance_id interface_id account_id type srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr protocol bytes packets start end action tcp_flags log_status",
                ),
                vec![
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK",
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43416 10.0.0.62 52.213.180.42 6 376 7 1566848875 1566848933 ACCEPT 18 OK",
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 100701 70 1566848875 1566848933 ACCEPT 2 OK",
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 632 12 1566848875 1566848933 ACCEPT 18 OK",
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 63388 1219 1566848933 1566849113 ACCEPT 1 OK",
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 23294588 15774 1566848933 1566849113 ACCEPT 1 OK",
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43638 5001 52.213.180.42 10.0.0.62 6 1260 17 1566933133 1566933193 ACCEPT 3 OK",
                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43638 10.0.0.62 52.213.180.42 6 967 14 1566933133 1566933193 ACCEPT 19 OK",
                ],
            ),
            (
                Some("instance_id interface_id srcaddr dstaddr pkt_srcaddr pkt_dstaddr"),
                vec![
                    "- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5",
                    "- eni-1235b8ca123456789 10.0.0.220 203.0.113.5 10.0.0.220 203.0.113.5",
                    "- eni-1235b8ca123456789 203.0.113.5 10.0.0.220 203.0.113.5 10.0.0.220",
                    "- eni-1235b8ca123456789 10.0.0.220 10.0.1.5 203.0.113.5 10.0.1.5",
                    "i-01234567890123456 eni-1111aaaa2222bbbb3 10.0.1.5 203.0.113.5 10.0.1.5 203.0.113.5",
                    "i-01234567890123456 eni-1111aaaa2222bbbb3 203.0.113.5 10.0.1.5 203.0.113.5 10.0.1.5",
                ],
            ),
            (
                Some(
                    "version interface_id account_id vpc_id subnet_id instance_id srcaddr dstaddr srcport dstport protocol tcp_flags type pkt_srcaddr pkt_dstaddr action log_status",
                ),
                vec![
                    "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.20.33.164 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK",
                    "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.40.2.236 10.20.33.164 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK",
                    "3 eni-11111111111111111 123456789010 vpc-abcdefab012345678 subnet-11111111aaaaaaaaa - 10.40.1.175 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK",
                    "3 eni-22222222222222222 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb - 10.40.2.236 10.40.2.31 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK",
                ],
            ),
            (
                Some(
                    "version srcaddr dstaddr srcport dstport protocol start end type packets bytes account_id vpc_id subnet_id instance_id interface_id region az_id sublocation_type sublocation_id action tcp_flags pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path flow_direction log_status",
                ),
                vec![
                    "5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK",
                    "5 10.0.0.71 52.95.128.179 34210 80 6 1616729292 1616729349 IPv4 7 471 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 3 10.0.0.71 52.95.128.179 - S3 8 egress OK",
                ],
            ),
        ];

        for (format, logs) in logs {
            for log in logs {
                assert!(parse_log(log, format).is_ok());
            }
        }
    }

    test_function![
        parse_aws_vpc_flow_log => ParseAwsVpcFlowLog;

        default {
             args: func_args![value: "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK"],
             want: Ok(value!({
                 "account_id": "123456789010",
                 "action": "ACCEPT",
                 "bytes": 4249,
                 "dstaddr": "172.31.16.21",
                 "dstport": 22,
                 "end": 1_418_530_070,
                 "interface_id": "eni-1235b8ca123456789",
                 "log_status": "OK",
                 "packets": 20,
                 "protocol": 6,
                 "srcaddr": "172.31.16.139",
                 "srcport": 20641,
                 "start": 1_418_530_010,
                 "version": 2
             })),
             tdef: TypeDef::object(inner_kind()).fallible(),
         }

        fields {
             args: func_args![value: "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK",
                              format: "version vpc_id subnet_id instance_id interface_id account_id type srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr protocol bytes packets start end action tcp_flags log_status"],
             want: Ok(value!({
                 "account_id": "123456789010",
                 "action": "ACCEPT",
                 "bytes": 568,
                 "dstaddr": "10.0.0.62",
                 "dstport": 5001,
                 "end": 1_566_848_933,
                 "instance_id": "i-01234567890123456",
                 "interface_id": "eni-1235b8ca123456789",
                 "log_status": "OK",
                 "packets": 8,
                 "pkt_dstaddr": "10.0.0.62",
                 "pkt_srcaddr": "52.213.180.42",
                 "protocol": 6,
                 "srcaddr": "52.213.180.42",
                 "srcport": 43416,
                 "start": 1_566_848_875,
                 "subnet_id": "subnet-aaaaaaaa012345678",
                 "tcp_flags": 2,
                 "type": "IPv4",
                 "version": 3,
                 "vpc_id": "vpc-abcdefab012345678"
             })),
             tdef: TypeDef::object(inner_kind()).fallible(),
         }
    ];
}