pcapsql_core/protocol/
ntp.rs

1//! NTP protocol parser.
2//!
3//! Parses NTP (Network Time Protocol) messages used for time synchronization.
4//! Matches on UDP port 123.
5
6use compact_str::CompactString;
7use smallvec::SmallVec;
8
9use super::{FieldValue, ParseContext, ParseResult, Protocol};
10use crate::schema::{DataKind, FieldDescriptor};
11
12/// NTP port.
13pub const NTP_PORT: u16 = 123;
14
15/// NTP header size.
16const NTP_HEADER_SIZE: usize = 48;
17
18/// NTP protocol parser.
19#[derive(Debug, Clone, Copy)]
20pub struct NtpProtocol;
21
22impl Protocol for NtpProtocol {
23    fn name(&self) -> &'static str {
24        "ntp"
25    }
26
27    fn display_name(&self) -> &'static str {
28        "NTP"
29    }
30
31    fn can_parse(&self, context: &ParseContext) -> Option<u32> {
32        // Check for NTP port
33        let src_port = context.hint("src_port");
34        let dst_port = context.hint("dst_port");
35
36        match (src_port, dst_port) {
37            (Some(p), _) | (_, Some(p)) if p == NTP_PORT as u64 => Some(100),
38            _ => None,
39        }
40    }
41
42    fn parse<'a>(&self, data: &'a [u8], _context: &ParseContext) -> ParseResult<'a> {
43        // NTP header is 48 bytes
44        if data.len() < NTP_HEADER_SIZE {
45            return ParseResult::error("NTP header too short".to_string(), data);
46        }
47
48        let mut fields = SmallVec::new();
49
50        // First byte: LI (2 bits), VN (3 bits), Mode (3 bits)
51        let first_byte = data[0];
52        let li = (first_byte >> 6) & 0x03; // Leap Indicator
53        let version = (first_byte >> 3) & 0x07; // Version Number
54        let mode = first_byte & 0x07; // Mode
55
56        fields.push(("version", FieldValue::UInt8(version)));
57        fields.push(("mode", FieldValue::UInt8(mode)));
58        fields.push(("leap_indicator", FieldValue::UInt8(li)));
59
60        // Second byte: Stratum
61        let stratum = data[1];
62        fields.push(("stratum", FieldValue::UInt8(stratum)));
63
64        // Third byte: Poll (signed, log2 seconds)
65        let poll = data[2] as i8;
66        fields.push(("poll", FieldValue::UInt8(poll as u8)));
67
68        // Fourth byte: Precision (signed, log2 seconds)
69        let precision = data[3] as i8;
70        fields.push(("precision", FieldValue::UInt8(precision as u8)));
71
72        // Root Delay (4 bytes, signed fixed-point)
73        let root_delay = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
74        fields.push(("root_delay", FieldValue::UInt32(root_delay)));
75
76        // Root Dispersion (4 bytes, unsigned fixed-point)
77        let root_dispersion = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
78        fields.push(("root_dispersion", FieldValue::UInt32(root_dispersion)));
79
80        // Reference ID (4 bytes) - meaning depends on stratum
81        let ref_id_bytes = &data[12..16];
82        let reference_id = if stratum == 0 || stratum == 1 {
83            // For stratum 0-1, it's ASCII (e.g., "GPS", "PPS")
84            CompactString::new(String::from_utf8_lossy(ref_id_bytes).trim_end_matches('\0'))
85        } else {
86            // For stratum 2+, it's an IP address
87            CompactString::new(format!(
88                "{}.{}.{}.{}",
89                ref_id_bytes[0], ref_id_bytes[1], ref_id_bytes[2], ref_id_bytes[3]
90            ))
91        };
92        fields.push(("reference_id", FieldValue::OwnedString(reference_id)));
93
94        // Reference Timestamp (8 bytes) - NTP format
95        let reference_ts = read_ntp_timestamp(&data[16..24]);
96        fields.push(("reference_ts", FieldValue::Int64(reference_ts)));
97
98        // Origin Timestamp (8 bytes)
99        let origin_ts = read_ntp_timestamp(&data[24..32]);
100        fields.push(("origin_ts", FieldValue::Int64(origin_ts)));
101
102        // Receive Timestamp (8 bytes)
103        let receive_ts = read_ntp_timestamp(&data[32..40]);
104        fields.push(("receive_ts", FieldValue::Int64(receive_ts)));
105
106        // Transmit Timestamp (8 bytes)
107        let transmit_ts = read_ntp_timestamp(&data[40..48]);
108        fields.push(("transmit_ts", FieldValue::Int64(transmit_ts)));
109
110        // Any remaining data would be extension fields or authentication
111        let remaining = &data[NTP_HEADER_SIZE..];
112
113        ParseResult::success(fields, remaining, SmallVec::new())
114    }
115
116    fn schema_fields(&self) -> Vec<FieldDescriptor> {
117        vec![
118            FieldDescriptor::new("ntp.version", DataKind::UInt8).set_nullable(true),
119            FieldDescriptor::new("ntp.mode", DataKind::UInt8).set_nullable(true),
120            FieldDescriptor::new("ntp.leap_indicator", DataKind::UInt8).set_nullable(true),
121            FieldDescriptor::new("ntp.stratum", DataKind::UInt8).set_nullable(true),
122            FieldDescriptor::new("ntp.poll", DataKind::UInt8).set_nullable(true),
123            FieldDescriptor::new("ntp.precision", DataKind::UInt8).set_nullable(true),
124            FieldDescriptor::new("ntp.root_delay", DataKind::UInt32).set_nullable(true),
125            FieldDescriptor::new("ntp.root_dispersion", DataKind::UInt32).set_nullable(true),
126            FieldDescriptor::new("ntp.reference_id", DataKind::String).set_nullable(true),
127            FieldDescriptor::new("ntp.reference_ts", DataKind::Int64).set_nullable(true),
128            FieldDescriptor::new("ntp.origin_ts", DataKind::Int64).set_nullable(true),
129            FieldDescriptor::new("ntp.receive_ts", DataKind::Int64).set_nullable(true),
130            FieldDescriptor::new("ntp.transmit_ts", DataKind::Int64).set_nullable(true),
131        ]
132    }
133
134    fn child_protocols(&self) -> &[&'static str] {
135        &[]
136    }
137
138    fn dependencies(&self) -> &'static [&'static str] {
139        &["udp"]
140    }
141}
142
143/// Read an NTP timestamp (8 bytes) and return as i64 (seconds since 1900).
144/// NTP timestamp: 32-bit seconds + 32-bit fraction.
145fn read_ntp_timestamp(data: &[u8]) -> i64 {
146    if data.len() < 8 {
147        return 0;
148    }
149    let seconds = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
150    // We're ignoring the fractional part for simplicity
151    seconds as i64
152}
153
154/// NTP modes.
155#[allow(dead_code)]
156pub mod mode {
157    pub const RESERVED: u8 = 0;
158    pub const SYMMETRIC_ACTIVE: u8 = 1;
159    pub const SYMMETRIC_PASSIVE: u8 = 2;
160    pub const CLIENT: u8 = 3;
161    pub const SERVER: u8 = 4;
162    pub const BROADCAST: u8 = 5;
163    pub const CONTROL: u8 = 6;
164    pub const PRIVATE: u8 = 7;
165}
166
167/// NTP stratum levels.
168#[allow(dead_code)]
169pub mod stratum {
170    pub const UNSPECIFIED: u8 = 0;
171    pub const PRIMARY: u8 = 1;
172    // 2-15: Secondary reference (via NTP)
173    pub const UNSYNCHRONIZED: u8 = 16;
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    /// Create an NTP client request packet.
181    fn create_ntp_client_request() -> Vec<u8> {
182        let mut packet = vec![0u8; 48];
183
184        // LI=0, VN=4, Mode=3 (client)
185        packet[0] = 0x23; // 0b00100011
186
187        // Stratum: 0 (unspecified)
188        packet[1] = 0;
189
190        // Poll: 6 (64 seconds)
191        packet[2] = 6;
192
193        // Precision: -20
194        packet[3] = 0xEC; // -20 as signed byte
195
196        // Root Delay: 0
197        // Root Dispersion: 0
198        // Reference ID: 0
199        // All timestamps: 0 for client request
200
201        packet
202    }
203
204    /// Create an NTP server response packet.
205    fn create_ntp_server_response() -> Vec<u8> {
206        let mut packet = vec![0u8; 48];
207
208        // LI=0, VN=4, Mode=4 (server)
209        packet[0] = 0x24; // 0b00100100
210
211        // Stratum: 1 (primary)
212        packet[1] = 1;
213
214        // Poll: 6
215        packet[2] = 6;
216
217        // Precision: -20
218        packet[3] = 0xEC;
219
220        // Root Delay: some small value
221        packet[4..8].copy_from_slice(&0x00000100u32.to_be_bytes());
222
223        // Root Dispersion: some small value
224        packet[8..12].copy_from_slice(&0x00000200u32.to_be_bytes());
225
226        // Reference ID: "GPS\0" for stratum 1
227        packet[12..16].copy_from_slice(b"GPS\0");
228
229        // Reference timestamp: some NTP time
230        packet[16..20].copy_from_slice(&0xE1B23456u32.to_be_bytes());
231
232        // Origin timestamp
233        packet[24..28].copy_from_slice(&0xE1B23460u32.to_be_bytes());
234
235        // Receive timestamp
236        packet[32..36].copy_from_slice(&0xE1B23461u32.to_be_bytes());
237
238        // Transmit timestamp
239        packet[40..44].copy_from_slice(&0xE1B23462u32.to_be_bytes());
240
241        packet
242    }
243
244    #[test]
245    fn test_can_parse_ntp() {
246        let parser = NtpProtocol;
247
248        // Without hint
249        let ctx1 = ParseContext::new(1);
250        assert!(parser.can_parse(&ctx1).is_none());
251
252        // With dst_port 123
253        let mut ctx2 = ParseContext::new(1);
254        ctx2.insert_hint("dst_port", 123);
255        assert!(parser.can_parse(&ctx2).is_some());
256
257        // With src_port 123
258        let mut ctx3 = ParseContext::new(1);
259        ctx3.insert_hint("src_port", 123);
260        assert!(parser.can_parse(&ctx3).is_some());
261
262        // With different port
263        let mut ctx4 = ParseContext::new(1);
264        ctx4.insert_hint("dst_port", 80);
265        assert!(parser.can_parse(&ctx4).is_none());
266    }
267
268    #[test]
269    fn test_parse_ntp_client_request() {
270        let packet = create_ntp_client_request();
271
272        let parser = NtpProtocol;
273        let mut context = ParseContext::new(1);
274        context.insert_hint("dst_port", 123);
275
276        let result = parser.parse(&packet, &context);
277
278        assert!(result.is_ok());
279        assert_eq!(result.get("version"), Some(&FieldValue::UInt8(4)));
280        assert_eq!(result.get("mode"), Some(&FieldValue::UInt8(3))); // Client
281        assert_eq!(result.get("stratum"), Some(&FieldValue::UInt8(0)));
282    }
283
284    #[test]
285    fn test_parse_ntp_server_response() {
286        let packet = create_ntp_server_response();
287
288        let parser = NtpProtocol;
289        let mut context = ParseContext::new(1);
290        context.insert_hint("src_port", 123);
291
292        let result = parser.parse(&packet, &context);
293
294        assert!(result.is_ok());
295        assert_eq!(result.get("version"), Some(&FieldValue::UInt8(4)));
296        assert_eq!(result.get("mode"), Some(&FieldValue::UInt8(4))); // Server
297        assert_eq!(result.get("stratum"), Some(&FieldValue::UInt8(1))); // Primary
298        assert_eq!(
299            result.get("reference_id"),
300            Some(&FieldValue::OwnedString(CompactString::new("GPS")))
301        );
302    }
303
304    #[test]
305    fn test_ntp_version_modes() {
306        // Test NTPv3 client
307        let mut packet = vec![0u8; 48];
308        packet[0] = 0x1B; // LI=0, VN=3, Mode=3
309
310        let parser = NtpProtocol;
311        let context = ParseContext::new(1);
312
313        let result = parser.parse(&packet, &context);
314
315        assert!(result.is_ok());
316        assert_eq!(result.get("version"), Some(&FieldValue::UInt8(3)));
317        assert_eq!(result.get("mode"), Some(&FieldValue::UInt8(3)));
318    }
319
320    #[test]
321    fn test_ntp_schema_fields() {
322        let parser = NtpProtocol;
323        let fields = parser.schema_fields();
324
325        assert!(!fields.is_empty());
326
327        let field_names: Vec<&str> = fields.iter().map(|f| f.name).collect();
328        assert!(field_names.contains(&"ntp.version"));
329        assert!(field_names.contains(&"ntp.mode"));
330        assert!(field_names.contains(&"ntp.stratum"));
331        assert!(field_names.contains(&"ntp.reference_ts"));
332    }
333
334    #[test]
335    fn test_ntp_too_short() {
336        let short_packet = vec![0u8; 20]; // Too short for NTP
337
338        let parser = NtpProtocol;
339        let context = ParseContext::new(1);
340
341        let result = parser.parse(&short_packet, &context);
342
343        assert!(!result.is_ok());
344        assert!(result.error.is_some());
345    }
346}