cloudfront_logs/parquet/
mod.rs

1use crate::{shared::*, types::*, CheckedRawLogLine};
2
3pub use LogLine as ParquetLogLine;
4
5/// A mostly borrowed version suitable for writing into parquet files
6///
7/// It's similar to [`CheckedRawLogLine`],
8/// but with some fields slightly more typed (time, numbers, options)
9#[derive(Debug, PartialEq, parquet_derive::ParquetRecordWriter)]
10pub struct LogLine<'a> {
11    pub date: NaiveDate,
12    pub time: &'a str, // not supported: NaiveTime
13    pub datetime: NaiveDateTime,
14    pub x_edge_location: &'a str,
15    pub sc_bytes: u64,
16    pub c_ip: &'a str,
17    pub cs_method: &'a str,
18    pub cs_host: &'a str,
19    pub cs_uri_stem: &'a str,
20    pub sc_status: u16,
21    pub cs_referer: Option<&'a str>,
22    pub cs_user_agent: &'a str,
23    pub cs_uri_query: Option<&'a str>,
24    pub cs_cookie: Option<&'a str>,
25    pub x_edge_result_type: &'a str,
26    pub x_edge_request_id: &'a str,
27    pub x_host_header: &'a str,
28    pub cs_protocol: &'a str,
29    pub cs_bytes: u64,
30    pub time_taken: f64, // not supported: Duration
31    pub x_forwarded_for: Option<&'a str>,
32    pub ssl_protocol: Option<&'a str>,
33    pub ssl_cipher: Option<&'a str>,
34    pub x_edge_response_result_type: &'a str,
35    pub cs_protocol_version: &'a str,
36    pub fle_status: Option<&'a str>,
37    pub fle_encrypted_fields: Option<u64>,
38    pub c_port: u16,
39    pub time_to_first_byte: f64, // not supported: Duration
40    pub x_edge_detailed_result_type: &'a str,
41    pub sc_content_type: &'a str,
42    pub sc_content_len: u64,
43    pub sc_range_start: Option<i64>,
44    pub sc_range_end: Option<i64>,
45}
46
47impl<'a> TryFrom<&'a str> for LogLine<'a> {
48    type Error = &'static str;
49
50    fn try_from(line: &'a str) -> Result<Self, Self::Error> {
51        validate_line(line)?;
52
53        let mut iter = MemchrTabSplitter::new(line);
54
55        let date = NaiveDate::parse_from_str(iter.next().unwrap(), "%Y-%m-%d")
56            .map_err(|_e| "date invalid")?;
57        let raw_time = iter.next().unwrap();
58        let time = NaiveTime::parse_from_str(raw_time, "%H:%M:%S").map_err(|_e| "time invalid")?;
59        let datetime = NaiveDateTime::new(date, time);
60
61        let line = Self {
62            date,
63            time: raw_time,
64            datetime,
65            x_edge_location: iter.next().unwrap(),
66            sc_bytes: iter
67                .next()
68                .unwrap()
69                .parse::<u64>()
70                .map_err(|_e| "sc_bytes invalid")?,
71            c_ip: iter.next().unwrap(),
72            cs_method: iter.next().unwrap(),
73            cs_host: iter.next().unwrap(),
74            cs_uri_stem: iter.next().unwrap(),
75            sc_status: iter
76                .next()
77                .unwrap()
78                .parse::<u16>()
79                .map_err(|_e| "sc_status invalid")?,
80            cs_referer: iter.next().and_then(str::as_optional_str),
81            cs_user_agent: iter.next().unwrap(),
82            cs_uri_query: iter.next().and_then(str::as_optional_str),
83            cs_cookie: iter.next().and_then(str::as_optional_str),
84            x_edge_result_type: iter.next().unwrap(),
85            x_edge_request_id: iter.next().unwrap(),
86            x_host_header: iter.next().unwrap(),
87            cs_protocol: iter.next().unwrap(),
88            cs_bytes: iter
89                .next()
90                .unwrap()
91                .parse::<u64>()
92                .map_err(|_e| "cs_bytes invalid")?,
93            time_taken: iter
94                .next()
95                .unwrap()
96                .parse::<f64>()
97                .map_err(|_e| "time_taken invalid")?,
98            x_forwarded_for: iter.next().and_then(str::as_optional_str),
99            ssl_protocol: iter.next().and_then(str::as_optional_str),
100            ssl_cipher: iter.next().and_then(str::as_optional_str),
101            x_edge_response_result_type: iter.next().unwrap(),
102            cs_protocol_version: iter.next().unwrap(),
103            fle_status: iter.next().and_then(str::as_optional_str),
104            fle_encrypted_fields: iter
105                .next()
106                .and_then(as_optional_t)
107                .transpose()
108                .map_err(|_e| "fle_encrypted_fields invalid")?,
109            c_port: iter
110                .next()
111                .unwrap()
112                .parse::<u16>()
113                .map_err(|_e| "c_port invalid")?,
114            time_to_first_byte: iter
115                .next()
116                .unwrap()
117                .parse::<f64>()
118                .map_err(|_e| "time_to_first_byte invalid")?,
119            x_edge_detailed_result_type: iter.next().unwrap(),
120            sc_content_type: iter.next().unwrap(),
121            sc_content_len: iter
122                .next()
123                .unwrap()
124                .parse::<u64>()
125                .map_err(|_e| "sc_content_len invalid")?,
126            sc_range_start: iter
127                .next()
128                .and_then(as_optional_t)
129                .transpose()
130                .map_err(|_e| "sc_range_start invalid")?,
131            sc_range_end: iter
132                .next()
133                .and_then(as_optional_t)
134                .transpose()
135                .map_err(|_e| "sc_range_end invalid")?,
136        };
137        Ok(line)
138    }
139}
140
141impl<'a> TryFrom<CheckedRawLogLine<'a>> for LogLine<'a> {
142    type Error = &'static str;
143
144    fn try_from(raw: CheckedRawLogLine<'a>) -> Result<Self, Self::Error> {
145        let date = NaiveDate::parse_from_str(raw.date, "%Y-%m-%d").map_err(|_e| "date invalid")?;
146        let time = NaiveTime::parse_from_str(raw.time, "%H:%M:%S").map_err(|_e| "time invalid")?;
147        let datetime = NaiveDateTime::new(date, time);
148
149        let line = Self {
150            date,
151            time: raw.time,
152            datetime,
153            x_edge_location: raw.x_edge_location,
154            sc_bytes: raw
155                .sc_bytes
156                .parse::<u64>()
157                .map_err(|_e| "sc_bytes invalid")?,
158            c_ip: raw.c_ip,
159            cs_method: raw.cs_method,
160            cs_host: raw.cs_host,
161            cs_uri_stem: raw.cs_uri_stem,
162            sc_status: raw
163                .sc_status
164                .parse::<u16>()
165                .map_err(|_e| "sc_status invalid")?,
166            cs_referer: raw.cs_referer.as_optional_str(),
167            cs_user_agent: raw.cs_user_agent,
168            cs_uri_query: raw.cs_uri_query.as_optional_str(),
169            cs_cookie: raw.cs_cookie.as_optional_str(),
170            x_edge_result_type: raw.x_edge_result_type,
171            x_edge_request_id: raw.x_edge_request_id,
172            x_host_header: raw.x_host_header,
173            cs_protocol: raw.cs_protocol,
174            cs_bytes: raw
175                .cs_bytes
176                .parse::<u64>()
177                .map_err(|_e| "cs_bytes invalid")?,
178            time_taken: raw
179                .time_taken
180                .parse::<f64>()
181                .map_err(|_e| "time_taken invalid")?,
182            x_forwarded_for: raw.x_forwarded_for.as_optional_str(),
183            ssl_protocol: raw.ssl_protocol.as_optional_str(),
184            ssl_cipher: raw.ssl_cipher.as_optional_str(),
185            x_edge_response_result_type: raw.x_edge_response_result_type,
186            cs_protocol_version: raw.cs_protocol_version,
187            fle_status: raw.fle_status.as_optional_str(),
188            fle_encrypted_fields: parse_as_option(raw.fle_encrypted_fields)
189                .map_err(|_e| "fle_encrypted_fields invalid")?,
190            c_port: raw.c_port.parse::<u16>().map_err(|_e| "c_port invalid")?,
191            time_to_first_byte: raw
192                .time_to_first_byte
193                .parse::<f64>()
194                .map_err(|_e| "time_to_first_byte invalid")?,
195            x_edge_detailed_result_type: raw.x_edge_detailed_result_type,
196            sc_content_type: raw.sc_content_type,
197            sc_content_len: raw
198                .sc_content_len
199                .parse::<u64>()
200                .map_err(|_e| "sc_content_len invalid")?,
201            sc_range_start: parse_as_option(raw.sc_range_start)
202                .map_err(|_e| "sc_range_start invalid")?,
203            sc_range_end: parse_as_option(raw.sc_range_end).map_err(|_e| "sc_range_end invalid")?,
204        };
205        Ok(line)
206    }
207}