ocular-protocol 0.10.0

Wire protocol parsers for ocular (Redis, MySQL, PostgreSQL, MongoDB, AMQP, HTTP)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
// Kafka binary protocol parser
// Wire format: [length:4][api_key:2][api_version:2][correlation_id:4][client_id...]...
// Response:    [length:4][correlation_id:4]...

/// Kafka API keys
fn api_key_name(key: i16) -> &'static str {
    match key {
        0 => "Produce",
        1 => "Fetch",
        2 => "ListOffsets",
        3 => "Metadata",
        4 => "LeaderAndIsr",
        5 => "StopReplica",
        6 => "UpdateMetadata",
        7 => "ControlledShutdown",
        8 => "OffsetCommit",
        9 => "OffsetFetch",
        10 => "FindCoordinator",
        11 => "JoinGroup",
        12 => "Heartbeat",
        13 => "LeaveGroup",
        14 => "SyncGroup",
        15 => "DescribeGroups",
        16 => "ListGroups",
        18 => "ApiVersions",
        19 => "CreateTopics",
        20 => "DeleteTopics",
        21 => "DeleteRecords",
        22 => "InitProducerId",
        23 => "OffsetForLeaderEpoch",
        24 => "AddPartitionsToTxn",
        25 => "AddOffsetsToTxn",
        26 => "EndTxn",
        31 => "DescribeAcls",
        32 => "DescribeConfigs",
        33 => "AlterConfigs",
        35 => "DescribeLogDirs",
        36 => "SaslHandshake",
        37 => "SaslAuthenticate",
        42 => "DeleteGroups",
        44 => "IncrementalAlterConfigs",
        46 => "DescribeProducers",
        47 => "OffsetDelete",
        50 => "DescribeCluster",
        60 => "DescribeTopicPartitions",
        75 => "DescribeTopicPartitions",
        _ => "Unknown",
    }
}

/// Parse Kafka request: extract api_key, version, correlation_id, client_id, and topic if present
pub fn parse_kafka_request(buf: &[u8]) -> Option<String> {
    if buf.len() < 12 { return None; }
    let length = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
    if buf.len() < 4 + length { return None; }

    let api_key = i16::from_be_bytes([buf[4], buf[5]]);
    let api_version = i16::from_be_bytes([buf[6], buf[7]]);
    let name = api_key_name(api_key);

    // Try to extract topic for Produce/Fetch
    let detail = match api_key {
        0 => extract_produce_topic(buf).map(|t| format!("Produce v{} topic={}", api_version, t)),
        1 => extract_fetch_topic(buf).map(|t| format!("Fetch v{} topic={}", api_version, t)),
        3 => Some(format!("Metadata v{}", api_version)),
        18 => Some(format!("ApiVersions v{}", api_version)),
        19 => extract_topic_after_client_id(buf).map(|t| format!("CreateTopics v{} topic={}", api_version, t)),
        20 => extract_topic_after_client_id(buf).map(|t| format!("DeleteTopics v{} topic={}", api_version, t)),
        _ if name == "Unknown" => Some(format!("ApiKey({}) v{}", api_key, api_version)),
        _ => None,
    };

    Some(detail.unwrap_or_else(|| format!("{} v{}", name, api_version)))
}

/// Parse Kafka response summary
pub fn parse_kafka_response(buf: &[u8]) -> Option<String> {
    if buf.len() < 8 { return None; }
    // Response is just [length:4][correlation_id:4][...payload]
    // We can't determine much without knowing the request api_key
    let length = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
    let payload_size = length.saturating_sub(4);
    Some(format!("OK ({} bytes)", payload_size))
}

/// Format response detail
pub fn format_kafka_response_detail(buf: &[u8]) -> Option<String> {
    parse_kafka_response(buf)
}

/// Extract full command with body for Produce requests
pub fn extract_kafka_full_command(buf: &[u8]) -> Option<String> {
    if buf.len() < 12 { return None; }
    let api_key = i16::from_be_bytes([buf[4], buf[5]]);
    let summary = parse_kafka_request(buf)?;

    // Only extract body for Produce (api_key=0)
    if api_key != 0 {
        return Some(summary);
    }

    // Try structured parsing first, fall back to string extraction
    if let Some(records) = extract_produce_records(buf) {
        if !records.is_empty() {
            return Some(format!("{}\n{}", summary, records.join("\n")));
        }
    }

    // Fallback: find readable strings that look like message content
    if let Some(body) = extract_readable_payload(buf) {
        return Some(format!("{}\n{}", summary, body));
    }

    Some(summary)
}

/// Find readable UTF-8 strings in the payload that look like message content
fn extract_readable_payload(buf: &[u8]) -> Option<String> {
    // Skip the first ~40 bytes (header + topic metadata)
    // Look for JSON-like or substantial text content
    let search_start = 40.min(buf.len());
    let mut best: Option<&[u8]> = None;

    let mut i = search_start;
    while i < buf.len() {
        // Find start of a printable sequence
        if buf[i] >= 0x20 && buf[i] < 0x7F {
            let start = i;
            while i < buf.len() && buf[i] >= 0x20 && buf[i] < 0x7F {
                i += 1;
            }
            let candidate = &buf[start..i];
            let len = candidate.len();
            if len < 10 { continue; }

            // Look for JSON start within the candidate
            if let Some(json_offset) = candidate.iter().position(|&b| b == b'{' || b == b'[') {
                let json_slice = &candidate[json_offset..];
                if json_slice.len() >= 5 {
                    return Some(String::from_utf8_lossy(json_slice).to_string());
                }
            }

            // Keep the longest string as fallback
            if best.is_none_or(|b| candidate.len() > b.len()) {
                best = Some(candidate);
            }
        } else {
            i += 1;
        }
    }

    best.map(|b| String::from_utf8_lossy(b).to_string())
}

/// Extract record values from a Produce request by finding the RecordBatch
/// RecordBatch layout (after the batch starts):
///   baseOffset(8) + batchLength(4) + partitionLeaderEpoch(4) + magic(1=0x02)
///   + crc(4) + attributes(2) + lastOffsetDelta(4) + baseTimestamp(8)
///   + maxTimestamp(8) + producerId(8) + producerEpoch(2) + baseSequence(4)
///   + recordCount(4) + [records...]
fn extract_produce_records(buf: &[u8]) -> Option<Vec<String>> {
    // Find magic byte 0x02 at the correct position within a RecordBatch
    // The magic byte is at offset 16 from the start of a RecordBatch
    // Validate by checking batchLength makes sense
    let mut results = Vec::new();

    for pos in 12..buf.len().saturating_sub(61) {
        // magic byte check at offset +16
        if buf[pos + 16] != 0x02 { continue; }

        // Validate batchLength (at offset +8, 4 bytes)
        let batch_len = i32::from_be_bytes([buf[pos + 8], buf[pos + 9], buf[pos + 10], buf[pos + 11]]);
        if !(49..=10_000_000).contains(&batch_len) { continue; }
        let batch_len = batch_len as usize;
        if pos + 12 + batch_len > buf.len() { continue; }

        // Validate baseOffset should be 0 for producer requests
        let base_offset = i64::from_be_bytes([buf[pos], buf[pos+1], buf[pos+2], buf[pos+3], buf[pos+4], buf[pos+5], buf[pos+6], buf[pos+7]]);
        if base_offset != 0 { continue; }

        // attributes at offset +21 from batch start (after magic + crc)
        // Actually: baseOffset(8) + batchLength(4) + partitionLeaderEpoch(4) + magic(1) + crc(4) = 21
        let attributes = i16::from_be_bytes([buf[pos + 21], buf[pos + 22]]);
        let compression = attributes & 0x07;

        // recordCount at offset +45 from batch start
        let record_count = i32::from_be_bytes([buf[pos + 53], buf[pos + 54], buf[pos + 55], buf[pos + 56]]);
        if !(0..=100_000).contains(&record_count) { continue; }

        if compression != 0 {
            // Decompress records data
            let compressed_start = pos + 57;
            let compressed_end = pos + 12 + batch_len;
            if compressed_start >= compressed_end { return None; }
            let compressed = &buf[compressed_start..compressed_end];
            let decompressed = decompress_kafka(compression, compressed)?;

            let mut rpos = 0;
            for _ in 0..record_count {
                if rpos >= decompressed.len() { break; }
                if let Some((value, consumed)) = parse_record(&decompressed, rpos, decompressed.len()) {
                    if let Some(v) = value {
                        results.push(v);
                    }
                    rpos += consumed;
                } else {
                    break;
                }
            }
            return if results.is_empty() { None } else { Some(results) };
        }

        // Records start at offset +57 from batch start
        let records_start = pos + 57;
        let records_end = pos + 12 + batch_len;
        let mut rpos = records_start;

        for _ in 0..record_count {
            if rpos >= records_end { break; }
            if let Some((value, consumed)) = parse_record(buf, rpos, records_end) {
                if let Some(v) = value {
                    results.push(v);
                }
                rpos += consumed;
            } else {
                break;
            }
        }

        return if results.is_empty() { None } else { Some(results) };
    }

    None
}

/// Parse a single Record from a RecordBatch (uncompressed)
/// Record: length(varint) + attributes(1) + timestampDelta(varint) + offsetDelta(varint)
///         + keyLength(varint) + key + valueLength(varint) + value + headersCount(varint) + ...
fn parse_record(buf: &[u8], start: usize, end: usize) -> Option<(Option<String>, usize)> {
    let mut pos = start;
    if pos >= end { return None; }

    // record length (varint)
    let (record_len, n) = decode_varint(buf, pos)?;
    pos += n;
    let record_len = record_len as usize;
    if pos + record_len > end { return None; }
    let record_end = pos + record_len;

    // attributes (1 byte)
    if pos >= record_end { return Some((None, (record_end - start))); }
    pos += 1;

    // timestampDelta (varint)
    let (_, n) = decode_varint(buf, pos)?;
    pos += n;

    // offsetDelta (varint)
    let (_, n) = decode_varint(buf, pos)?;
    pos += n;

    // keyLength (varint) — -1 means null
    let (key_len, n) = decode_varint(buf, pos)?;
    pos += n;
    if key_len > 0 {
        pos += key_len as usize;
    }

    // valueLength (varint)
    let (value_len, n) = decode_varint(buf, pos)?;
    pos += n;

    let value = if value_len > 0 && pos + value_len as usize <= record_end {
        let v = &buf[pos..pos + value_len as usize];
        Some(String::from_utf8_lossy(v).to_string())
    } else {
        None
    };

    Some((value, record_end - start))
}

/// Decode a zigzag-encoded varint
fn decode_varint(buf: &[u8], start: usize) -> Option<(i64, usize)> {
    let mut result: u64 = 0;
    let mut shift = 0;
    let mut pos = start;
    loop {
        if pos >= buf.len() { return None; }
        let byte = buf[pos];
        result |= ((byte & 0x7F) as u64) << shift;
        pos += 1;
        if byte & 0x80 == 0 { break; }
        shift += 7;
        if shift > 63 { return None; }
    }
    // zigzag decode
    let decoded = ((result >> 1) as i64) ^ -((result & 1) as i64);
    Some((decoded, pos - start))
}

/// Check if a Kafka request frame is complete (length-prefixed)
pub fn kafka_frame_complete(buf: &[u8]) -> bool {
    if buf.len() < 4 { return false; }
    let length = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
    buf.len() >= 4 + length
}

/// Try to extract topic name from Produce request
/// Format after header: [transactional_id][acks:2][timeout:4][topic_count:4][topic_name...]
/// Simplified: scan for topic string after client_id
fn extract_produce_topic(buf: &[u8]) -> Option<String> {
    extract_topic_after_client_id(buf)
}

/// Try to extract topic name from Fetch request
fn extract_fetch_topic(buf: &[u8]) -> Option<String> {
    extract_topic_after_client_id(buf)
}

/// Skip client_id and look for a topic-like string in the payload
fn extract_topic_after_client_id(buf: &[u8]) -> Option<String> {
    if buf.len() < 14 { return None; }
    // Skip: length(4) + api_key(2) + version(2) + correlation_id(4) = 12
    let pos = 12;
    // client_id is a nullable string: [len:2][bytes...]
    if pos + 2 > buf.len() { return None; }
    let client_id_len = i16::from_be_bytes([buf[pos], buf[pos + 1]]);
    let after_client = if client_id_len < 0 {
        pos + 2
    } else {
        pos + 2 + client_id_len as usize
    };

    // Scan remaining bytes for a short string that looks like a topic name
    // This is a heuristic — Kafka's format varies by api_key and version
    find_first_string(buf, after_client)
}

/// Find the first length-prefixed string in buf starting at pos
fn find_first_string(buf: &[u8], pos: usize) -> Option<String> {
    if pos + 2 > buf.len() { return None; }
    // Try different offsets to find a reasonable string
    for offset in 0..20 {
        let p = pos + offset;
        if p + 2 > buf.len() { break; }
        let len = i16::from_be_bytes([buf[p], buf[p + 1]]);
        if len > 0 && len < 256 && p + 2 + len as usize <= buf.len() {
            let s = &buf[p + 2..p + 2 + len as usize];
            if let Ok(topic) = std::str::from_utf8(s) {
                if topic.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.') && topic.len() > 1 {
                    return Some(topic.to_string());
                }
            }
        }
    }
    None
}

/// Decompress Kafka record batch data.
/// Compression types: 1=gzip, 2=snappy, 3=lz4, 4=zstd
fn decompress_kafka(compression: i16, data: &[u8]) -> Option<Vec<u8>> {
    match compression {
        1 => {
            use std::io::Read;
            let mut decoder = flate2::read::GzDecoder::new(data);
            let mut out = Vec::new();
            decoder.read_to_end(&mut out).ok()?;
            Some(out)
        }
        2 => {
            snap::raw::Decoder::new().decompress_vec(data).ok()
        }
        3 => {
            // Kafka LZ4 uses the standard frame format
            use std::io::Read;
            lz4_flex::frame::FrameDecoder::new(data)
                .bytes().collect::<Result<Vec<u8>, _>>().ok()
        }
        4 => {
            zstd::decode_all(data).ok()
        }
        _ => None,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn make_request(api_key: i16, api_version: i16, client_id: &str) -> Vec<u8> {
        let client_id_bytes = client_id.as_bytes();
        let payload_len = 2 + 2 + 4 + 2 + client_id_bytes.len();
        let mut buf = Vec::new();
        buf.extend_from_slice(&(payload_len as i32).to_be_bytes()); // length
        buf.extend_from_slice(&api_key.to_be_bytes());
        buf.extend_from_slice(&api_version.to_be_bytes());
        buf.extend_from_slice(&1i32.to_be_bytes()); // correlation_id
        buf.extend_from_slice(&(client_id_bytes.len() as i16).to_be_bytes());
        buf.extend_from_slice(client_id_bytes);
        buf
    }

    #[test]
    fn test_parse_metadata_request() {
        let buf = make_request(3, 12, "my-app");
        assert_eq!(parse_kafka_request(&buf), Some("Metadata v12".into()));
    }

    #[test]
    fn test_parse_api_versions() {
        let buf = make_request(18, 3, "kafka-client");
        assert_eq!(parse_kafka_request(&buf), Some("ApiVersions v3".into()));
    }

    #[test]
    fn test_parse_response() {
        let mut buf = vec![0, 0, 0, 20]; // length = 20
        buf.extend_from_slice(&1i32.to_be_bytes()); // correlation_id
        buf.extend_from_slice(&[0u8; 16]); // payload
        assert_eq!(parse_kafka_response(&buf), Some("OK (16 bytes)".into()));
    }

    #[test]
    fn test_frame_complete() {
        let mut buf = vec![0, 0, 0, 4]; // length = 4
        buf.extend_from_slice(&[0u8; 4]);
        assert!(kafka_frame_complete(&buf));
        assert!(!kafka_frame_complete(&buf[..6]));
    }

    #[test]
    fn test_frame_incomplete() {
        assert!(!kafka_frame_complete(&[0, 0, 0, 10, 0, 0]));
    }
}