1use compact_str::CompactString;
7use smallvec::SmallVec;
8
9use super::{FieldValue, ParseContext, ParseResult, Protocol};
10use crate::schema::{DataKind, FieldDescriptor};
11
12pub const NTP_PORT: u16 = 123;
14
15const NTP_HEADER_SIZE: usize = 48;
17
18#[derive(Debug, Clone, Copy)]
20pub struct NtpProtocol;
21
22impl Protocol for NtpProtocol {
23 fn name(&self) -> &'static str {
24 "ntp"
25 }
26
27 fn display_name(&self) -> &'static str {
28 "NTP"
29 }
30
31 fn can_parse(&self, context: &ParseContext) -> Option<u32> {
32 let src_port = context.hint("src_port");
34 let dst_port = context.hint("dst_port");
35
36 match (src_port, dst_port) {
37 (Some(p), _) | (_, Some(p)) if p == NTP_PORT as u64 => Some(100),
38 _ => None,
39 }
40 }
41
42 fn parse<'a>(&self, data: &'a [u8], _context: &ParseContext) -> ParseResult<'a> {
43 if data.len() < NTP_HEADER_SIZE {
45 return ParseResult::error("NTP header too short".to_string(), data);
46 }
47
48 let mut fields = SmallVec::new();
49
50 let first_byte = data[0];
52 let li = (first_byte >> 6) & 0x03; let version = (first_byte >> 3) & 0x07; let mode = first_byte & 0x07; fields.push(("version", FieldValue::UInt8(version)));
57 fields.push(("mode", FieldValue::UInt8(mode)));
58 fields.push(("leap_indicator", FieldValue::UInt8(li)));
59
60 let stratum = data[1];
62 fields.push(("stratum", FieldValue::UInt8(stratum)));
63
64 let poll = data[2] as i8;
66 fields.push(("poll", FieldValue::UInt8(poll as u8)));
67
68 let precision = data[3] as i8;
70 fields.push(("precision", FieldValue::UInt8(precision as u8)));
71
72 let root_delay = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
74 fields.push(("root_delay", FieldValue::UInt32(root_delay)));
75
76 let root_dispersion = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
78 fields.push(("root_dispersion", FieldValue::UInt32(root_dispersion)));
79
80 let ref_id_bytes = &data[12..16];
82 let reference_id = if stratum == 0 || stratum == 1 {
83 CompactString::new(String::from_utf8_lossy(ref_id_bytes).trim_end_matches('\0'))
85 } else {
86 CompactString::new(format!(
88 "{}.{}.{}.{}",
89 ref_id_bytes[0], ref_id_bytes[1], ref_id_bytes[2], ref_id_bytes[3]
90 ))
91 };
92 fields.push(("reference_id", FieldValue::OwnedString(reference_id)));
93
94 let reference_ts = read_ntp_timestamp(&data[16..24]);
96 fields.push(("reference_ts", FieldValue::Int64(reference_ts)));
97
98 let origin_ts = read_ntp_timestamp(&data[24..32]);
100 fields.push(("origin_ts", FieldValue::Int64(origin_ts)));
101
102 let receive_ts = read_ntp_timestamp(&data[32..40]);
104 fields.push(("receive_ts", FieldValue::Int64(receive_ts)));
105
106 let transmit_ts = read_ntp_timestamp(&data[40..48]);
108 fields.push(("transmit_ts", FieldValue::Int64(transmit_ts)));
109
110 let remaining = &data[NTP_HEADER_SIZE..];
112
113 ParseResult::success(fields, remaining, SmallVec::new())
114 }
115
116 fn schema_fields(&self) -> Vec<FieldDescriptor> {
117 vec![
118 FieldDescriptor::new("ntp.version", DataKind::UInt8).set_nullable(true),
119 FieldDescriptor::new("ntp.mode", DataKind::UInt8).set_nullable(true),
120 FieldDescriptor::new("ntp.leap_indicator", DataKind::UInt8).set_nullable(true),
121 FieldDescriptor::new("ntp.stratum", DataKind::UInt8).set_nullable(true),
122 FieldDescriptor::new("ntp.poll", DataKind::UInt8).set_nullable(true),
123 FieldDescriptor::new("ntp.precision", DataKind::UInt8).set_nullable(true),
124 FieldDescriptor::new("ntp.root_delay", DataKind::UInt32).set_nullable(true),
125 FieldDescriptor::new("ntp.root_dispersion", DataKind::UInt32).set_nullable(true),
126 FieldDescriptor::new("ntp.reference_id", DataKind::String).set_nullable(true),
127 FieldDescriptor::new("ntp.reference_ts", DataKind::Int64).set_nullable(true),
128 FieldDescriptor::new("ntp.origin_ts", DataKind::Int64).set_nullable(true),
129 FieldDescriptor::new("ntp.receive_ts", DataKind::Int64).set_nullable(true),
130 FieldDescriptor::new("ntp.transmit_ts", DataKind::Int64).set_nullable(true),
131 ]
132 }
133
134 fn child_protocols(&self) -> &[&'static str] {
135 &[]
136 }
137
138 fn dependencies(&self) -> &'static [&'static str] {
139 &["udp"]
140 }
141}
142
143fn read_ntp_timestamp(data: &[u8]) -> i64 {
146 if data.len() < 8 {
147 return 0;
148 }
149 let seconds = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
150 seconds as i64
152}
153
154#[allow(dead_code)]
156pub mod mode {
157 pub const RESERVED: u8 = 0;
158 pub const SYMMETRIC_ACTIVE: u8 = 1;
159 pub const SYMMETRIC_PASSIVE: u8 = 2;
160 pub const CLIENT: u8 = 3;
161 pub const SERVER: u8 = 4;
162 pub const BROADCAST: u8 = 5;
163 pub const CONTROL: u8 = 6;
164 pub const PRIVATE: u8 = 7;
165}
166
167#[allow(dead_code)]
169pub mod stratum {
170 pub const UNSPECIFIED: u8 = 0;
171 pub const PRIMARY: u8 = 1;
172 pub const UNSYNCHRONIZED: u8 = 16;
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179
180 fn create_ntp_client_request() -> Vec<u8> {
182 let mut packet = vec![0u8; 48];
183
184 packet[0] = 0x23; packet[1] = 0;
189
190 packet[2] = 6;
192
193 packet[3] = 0xEC; packet
202 }
203
204 fn create_ntp_server_response() -> Vec<u8> {
206 let mut packet = vec![0u8; 48];
207
208 packet[0] = 0x24; packet[1] = 1;
213
214 packet[2] = 6;
216
217 packet[3] = 0xEC;
219
220 packet[4..8].copy_from_slice(&0x00000100u32.to_be_bytes());
222
223 packet[8..12].copy_from_slice(&0x00000200u32.to_be_bytes());
225
226 packet[12..16].copy_from_slice(b"GPS\0");
228
229 packet[16..20].copy_from_slice(&0xE1B23456u32.to_be_bytes());
231
232 packet[24..28].copy_from_slice(&0xE1B23460u32.to_be_bytes());
234
235 packet[32..36].copy_from_slice(&0xE1B23461u32.to_be_bytes());
237
238 packet[40..44].copy_from_slice(&0xE1B23462u32.to_be_bytes());
240
241 packet
242 }
243
244 #[test]
245 fn test_can_parse_ntp() {
246 let parser = NtpProtocol;
247
248 let ctx1 = ParseContext::new(1);
250 assert!(parser.can_parse(&ctx1).is_none());
251
252 let mut ctx2 = ParseContext::new(1);
254 ctx2.insert_hint("dst_port", 123);
255 assert!(parser.can_parse(&ctx2).is_some());
256
257 let mut ctx3 = ParseContext::new(1);
259 ctx3.insert_hint("src_port", 123);
260 assert!(parser.can_parse(&ctx3).is_some());
261
262 let mut ctx4 = ParseContext::new(1);
264 ctx4.insert_hint("dst_port", 80);
265 assert!(parser.can_parse(&ctx4).is_none());
266 }
267
268 #[test]
269 fn test_parse_ntp_client_request() {
270 let packet = create_ntp_client_request();
271
272 let parser = NtpProtocol;
273 let mut context = ParseContext::new(1);
274 context.insert_hint("dst_port", 123);
275
276 let result = parser.parse(&packet, &context);
277
278 assert!(result.is_ok());
279 assert_eq!(result.get("version"), Some(&FieldValue::UInt8(4)));
280 assert_eq!(result.get("mode"), Some(&FieldValue::UInt8(3))); assert_eq!(result.get("stratum"), Some(&FieldValue::UInt8(0)));
282 }
283
284 #[test]
285 fn test_parse_ntp_server_response() {
286 let packet = create_ntp_server_response();
287
288 let parser = NtpProtocol;
289 let mut context = ParseContext::new(1);
290 context.insert_hint("src_port", 123);
291
292 let result = parser.parse(&packet, &context);
293
294 assert!(result.is_ok());
295 assert_eq!(result.get("version"), Some(&FieldValue::UInt8(4)));
296 assert_eq!(result.get("mode"), Some(&FieldValue::UInt8(4))); assert_eq!(result.get("stratum"), Some(&FieldValue::UInt8(1))); assert_eq!(
299 result.get("reference_id"),
300 Some(&FieldValue::OwnedString(CompactString::new("GPS")))
301 );
302 }
303
304 #[test]
305 fn test_ntp_version_modes() {
306 let mut packet = vec![0u8; 48];
308 packet[0] = 0x1B; let parser = NtpProtocol;
311 let context = ParseContext::new(1);
312
313 let result = parser.parse(&packet, &context);
314
315 assert!(result.is_ok());
316 assert_eq!(result.get("version"), Some(&FieldValue::UInt8(3)));
317 assert_eq!(result.get("mode"), Some(&FieldValue::UInt8(3)));
318 }
319
320 #[test]
321 fn test_ntp_schema_fields() {
322 let parser = NtpProtocol;
323 let fields = parser.schema_fields();
324
325 assert!(!fields.is_empty());
326
327 let field_names: Vec<&str> = fields.iter().map(|f| f.name).collect();
328 assert!(field_names.contains(&"ntp.version"));
329 assert!(field_names.contains(&"ntp.mode"));
330 assert!(field_names.contains(&"ntp.stratum"));
331 assert!(field_names.contains(&"ntp.reference_ts"));
332 }
333
334 #[test]
335 fn test_ntp_too_short() {
336 let short_packet = vec![0u8; 20]; let parser = NtpProtocol;
339 let context = ParseContext::new(1);
340
341 let result = parser.parse(&short_packet, &context);
342
343 assert!(!result.is_ok());
344 assert!(result.error.is_some());
345 }
346}