Skip to main content

ocular_protocol/
amqp.rs

1//! AMQP 0-9-1 wire protocol parser
2//!
3//! Frame format: [type:1][channel:2][size:4][payload...][frame_end:0xCE]
4//! Method frame payload: [class_id:2][method_id:2][arguments...]
5
6const FRAME_METHOD: u8 = 1;
7const FRAME_HEADER: u8 = 2;
8const FRAME_BODY: u8 = 3;
9const FRAME_HEARTBEAT: u8 = 8;
10const FRAME_END: u8 = 0xCE;
11
12#[derive(Debug, Clone)]
13pub struct AmqpFrame {
14    pub frame_type: u8,
15    pub channel: u16,
16    pub method: Option<AmqpMethod>,
17    pub body: Option<Vec<u8>>,
18}
19
20#[derive(Debug, Clone)]
21pub struct AmqpMethod {
22    pub class_id: u16,
23    pub method_id: u16,
24    pub summary: String,
25    pub detail: String,
26}
27
28/// Parse the first complete AMQP frame from buffer. Returns None if incomplete.
29pub fn parse_amqp_frame(buf: &[u8]) -> Option<AmqpFrame> {
30    // Protocol header (client sends "AMQP\x00\x00\x09\x01")
31    if buf.len() >= 8 && &buf[0..4] == b"AMQP" {
32        return Some(AmqpFrame {
33            frame_type: 0,
34            channel: 0,
35            method: Some(AmqpMethod {
36                class_id: 0,
37                method_id: 0,
38                summary: "AMQP Protocol Header".into(),
39                detail: format!("AMQP {}.{}.{}.{}", buf[4], buf[5], buf[6], buf[7]),
40            }),
41            body: None,
42        });
43    }
44
45    if buf.len() < 8 { return None; }
46    let frame_type = buf[0];
47    let channel = u16::from_be_bytes([buf[1], buf[2]]);
48    let size = u32::from_be_bytes([buf[3], buf[4], buf[5], buf[6]]) as usize;
49    let total = 7 + size + 1;
50    if buf.len() < total { return None; }
51    if buf[total - 1] != FRAME_END { return None; }
52
53    let payload = &buf[7..7 + size];
54
55    match frame_type {
56        FRAME_METHOD => {
57            if size < 4 { return None; }
58            let class_id = u16::from_be_bytes([payload[0], payload[1]]);
59            let method_id = u16::from_be_bytes([payload[2], payload[3]]);
60            let args = &payload[4..];
61            let (summary, detail) = decode_method(class_id, method_id, args);
62            Some(AmqpFrame {
63                frame_type,
64                channel,
65                method: Some(AmqpMethod { class_id, method_id, summary, detail }),
66                body: None,
67            })
68        }
69        FRAME_HEADER => Some(AmqpFrame {
70            frame_type, channel, method: None,
71            body: Some(payload.to_vec()),
72        }),
73        FRAME_BODY => Some(AmqpFrame {
74            frame_type, channel, method: None,
75            body: Some(payload.to_vec()),
76        }),
77        FRAME_HEARTBEAT => Some(AmqpFrame {
78            frame_type, channel,
79            method: Some(AmqpMethod {
80                class_id: 0, method_id: 0,
81                summary: "Heartbeat".into(),
82                detail: "Heartbeat".into(),
83            }),
84            body: None,
85        }),
86        _ => None,
87    }
88}
89
90/// Returns the byte length of the first frame in buf, or None if incomplete.
91pub fn frame_len(buf: &[u8]) -> Option<usize> {
92    if buf.len() >= 8 && &buf[0..4] == b"AMQP" {
93        return Some(8);
94    }
95    if buf.len() < 7 { return None; }
96    let size = u32::from_be_bytes([buf[3], buf[4], buf[5], buf[6]]) as usize;
97    let total = 7 + size + 1;
98    if buf.len() < total { return None; }
99    Some(total)
100}
101
102/// Returns true if this method is "fire-and-forget" (server sends no response).
103/// Basic.Publish (60,40), Basic.Ack (60,80), Basic.Reject (60,90), Basic.Nack (60,120)
104pub fn is_async_method(class_id: u16, method_id: u16) -> bool {
105    matches!((class_id, method_id),
106        (60, 40) | (60, 50) | (60, 60) | (60, 80) | (60, 90) | (60, 120))
107}
108
109/// Parse a client→server buffer that may contain multiple frames.
110/// For publish: Method + Header + Body. Returns (summary, detail_with_body).
111pub fn parse_amqp_request_full(buf: &[u8]) -> Option<(String, String)> {
112    let frame = parse_amqp_frame(buf)?;
113    let method = frame.method.as_ref()?;
114    let summary = method.summary.clone();
115    let mut detail = method.detail.clone();
116
117    // Try to extract body from subsequent frames in the same buffer
118    if let Some(first_len) = frame_len(buf) {
119        let mut pos = first_len;
120        while let Some(flen) = frame_len(&buf[pos..]) {
121            if let Some(f) = parse_amqp_frame(&buf[pos..]) {
122                if f.frame_type == FRAME_BODY {
123                    if let Some(body) = &f.body {
124                        let text = String::from_utf8_lossy(body);
125                        detail = format!("{}\nBody: {}", detail, text);
126                    }
127                }
128            }
129            pos += flen;
130        }
131    }
132
133    Some((summary, detail))
134}
135
136/// Parse a client→server buffer, returning a summary for the event list
137pub fn parse_amqp_request(buf: &[u8]) -> Option<String> {
138    let frame = parse_amqp_frame(buf)?;
139    frame.method.map(|m| m.summary)
140}
141
142/// Parse a server→client buffer, returning a summary
143pub fn parse_amqp_response(buf: &[u8]) -> Option<String> {
144    let frame = parse_amqp_frame(buf)?;
145    match frame.frame_type {
146        FRAME_BODY => {
147            let body = frame.body.unwrap_or_default();
148            let text = String::from_utf8_lossy(&body);
149            let truncated: String = text.chars().take(80).collect();
150            Some(format!("Body: {}", truncated))
151        }
152        _ => frame.method.map(|m| m.summary),
153    }
154}
155
156/// Parse response with full detail for the detail panel
157pub fn format_amqp_response_detail(buf: &[u8]) -> Option<String> {
158    let frame = parse_amqp_frame(buf)?;
159    match frame.frame_type {
160        FRAME_BODY => {
161            let body = frame.body.unwrap_or_default();
162            Some(String::from_utf8_lossy(&body).to_string())
163        }
164        _ => frame.method.map(|m| m.detail),
165    }
166}
167
168/// Read a short string: [len:1][data...]
169fn read_short_str(buf: &[u8]) -> Option<(String, usize)> {
170    if buf.is_empty() { return None; }
171    let len = buf[0] as usize;
172    if buf.len() < 1 + len { return None; }
173    Some((String::from_utf8_lossy(&buf[1..1 + len]).to_string(), 1 + len))
174}
175
176fn decode_method(class_id: u16, method_id: u16, args: &[u8]) -> (String, String) {
177    match (class_id, method_id) {
178        // Connection
179        (10, 10) => ("Connection.Start".into(), "Connection.Start".into()),
180        (10, 11) => ("Connection.StartOk".into(), "Connection.StartOk".into()),
181        (10, 30) => ("Connection.Tune".into(), format_tune(args)),
182        (10, 31) => ("Connection.TuneOk".into(), format_tune(args)),
183        (10, 40) => {
184            let vhost = read_short_str(args).map(|(s, _)| s).unwrap_or_default();
185            (format!("Connection.Open vhost={}", vhost), format!("Connection.Open vhost={}", vhost))
186        }
187        (10, 41) => ("Connection.OpenOk".into(), "Connection.OpenOk".into()),
188        (10, 50) => ("Connection.Close".into(), format_close(args)),
189        (10, 51) => ("Connection.CloseOk".into(), "Connection.CloseOk".into()),
190        // Channel
191        (20, 10) => ("Channel.Open".into(), "Channel.Open".into()),
192        (20, 11) => ("Channel.OpenOk".into(), "Channel.OpenOk".into()),
193        (20, 40) => ("Channel.Close".into(), format_close(args)),
194        (20, 41) => ("Channel.CloseOk".into(), "Channel.CloseOk".into()),
195        // Exchange
196        (40, 10) => {
197            let (exchange, detail) = parse_exchange_declare(args);
198            (format!("Exchange.Declare {}", exchange), detail)
199        }
200        (40, 11) => ("Exchange.DeclareOk".into(), "Exchange.DeclareOk".into()),
201        (40, 20) => ("Exchange.Delete".into(), "Exchange.Delete".into()),
202        (40, 21) => ("Exchange.DeleteOk".into(), "Exchange.DeleteOk".into()),
203        // Queue
204        (50, 10) => {
205            let (queue, detail) = parse_queue_declare(args);
206            (format!("Queue.Declare {}", queue), detail)
207        }
208        (50, 11) => parse_queue_declare_ok(args),
209        (50, 20) => {
210            let (detail_str, summary) = parse_queue_bind(args);
211            (summary, detail_str)
212        }
213        (50, 21) => ("Queue.BindOk".into(), "Queue.BindOk".into()),
214        (50, 30) => ("Queue.Purge".into(), "Queue.Purge".into()),
215        (50, 31) => ("Queue.PurgeOk".into(), "Queue.PurgeOk".into()),
216        (50, 40) => ("Queue.Delete".into(), "Queue.Delete".into()),
217        (50, 41) => ("Queue.DeleteOk".into(), "Queue.DeleteOk".into()),
218        // Basic
219        (60, 10) => ("Basic.Qos".into(), "Basic.Qos".into()),
220        (60, 11) => ("Basic.QosOk".into(), "Basic.QosOk".into()),
221        (60, 20) => {
222            let (summary, detail) = parse_basic_consume(args);
223            (summary, detail)
224        }
225        (60, 21) => ("Basic.ConsumeOk".into(), "Basic.ConsumeOk".into()),
226        (60, 30) => ("Basic.Cancel".into(), "Basic.Cancel".into()),
227        (60, 31) => ("Basic.CancelOk".into(), "Basic.CancelOk".into()),
228        (60, 40) => {
229            let (summary, detail) = parse_basic_publish(args);
230            (summary, detail)
231        }
232        (60, 50) => ("Basic.Return".into(), "Basic.Return".into()),
233        (60, 60) => {
234            let (summary, detail) = parse_basic_deliver(args);
235            (summary, detail)
236        }
237        (60, 70) => ("Basic.Get".into(), parse_basic_get(args)),
238        (60, 71) => ("Basic.GetOk".into(), "Basic.GetOk".into()),
239        (60, 72) => ("Basic.GetEmpty".into(), "Basic.GetEmpty".into()),
240        (60, 80) => ("Basic.Ack".into(), "Basic.Ack".into()),
241        (60, 90) => ("Basic.Reject".into(), "Basic.Reject".into()),
242        (60, 120) => ("Basic.Nack".into(), "Basic.Nack".into()),
243        // Confirm
244        (85, 10) => ("Confirm.Select".into(), "Confirm.Select".into()),
245        (85, 11) => ("Confirm.SelectOk".into(), "Confirm.SelectOk".into()),
246        // Tx
247        (90, 10) => ("Tx.Select".into(), "Tx.Select".into()),
248        (90, 11) => ("Tx.SelectOk".into(), "Tx.SelectOk".into()),
249        (90, 20) => ("Tx.Commit".into(), "Tx.Commit".into()),
250        (90, 21) => ("Tx.CommitOk".into(), "Tx.CommitOk".into()),
251        (90, 30) => ("Tx.Rollback".into(), "Tx.Rollback".into()),
252        (90, 31) => ("Tx.RollbackOk".into(), "Tx.RollbackOk".into()),
253        _ => {
254            let s = format!("Method({}.{})", class_id, method_id);
255            (s.clone(), s)
256        }
257    }
258}
259
260fn format_tune(args: &[u8]) -> String {
261    if args.len() < 8 { return "Connection.Tune".into(); }
262    let channel_max = u16::from_be_bytes([args[0], args[1]]);
263    let frame_max = u32::from_be_bytes([args[2], args[3], args[4], args[5]]);
264    let heartbeat = u16::from_be_bytes([args[6], args[7]]);
265    format!("channel_max={} frame_max={} heartbeat={}", channel_max, frame_max, heartbeat)
266}
267
268fn format_close(args: &[u8]) -> String {
269    if args.len() < 4 { return "Close".into(); }
270    let code = u16::from_be_bytes([args[0], args[1]]);
271    let reason = read_short_str(&args[2..]).map(|(s, _)| s).unwrap_or_default();
272    format!("code={} reason={}", code, reason)
273}
274
275fn parse_exchange_declare(args: &[u8]) -> (String, String) {
276    // [reserved:2][exchange:short_str][type:short_str][flags:1][table]
277    if args.len() < 3 { return (String::new(), "Exchange.Declare".into()); }
278    let rest = &args[2..]; // skip reserved
279    let (exchange, consumed) = read_short_str(rest).unwrap_or_default();
280    let rest = &rest[consumed..];
281    let (ex_type, _) = read_short_str(rest).unwrap_or_default();
282    let detail = format!("Exchange.Declare exchange={} type={}", exchange, ex_type);
283    (exchange, detail)
284}
285
286fn parse_queue_declare(args: &[u8]) -> (String, String) {
287    // [reserved:2][queue:short_str][flags:1][table]
288    if args.len() < 3 { return (String::new(), "Queue.Declare".into()); }
289    let rest = &args[2..];
290    let (queue, _) = read_short_str(rest).unwrap_or_default();
291    let detail = format!("Queue.Declare queue={}", queue);
292    (queue, detail)
293}
294
295fn parse_queue_declare_ok(args: &[u8]) -> (String, String) {
296    let (queue, consumed) = read_short_str(args).unwrap_or_default();
297    let rest = &args[consumed..];
298    let (msg_count, consumer_count) = if rest.len() >= 8 {
299        (u32::from_be_bytes([rest[0], rest[1], rest[2], rest[3]]),
300         u32::from_be_bytes([rest[4], rest[5], rest[6], rest[7]]))
301    } else { (0, 0) };
302    let summary = format!("Queue.DeclareOk {}", queue);
303    let detail = format!("Queue.DeclareOk queue={} messages={} consumers={}", queue, msg_count, consumer_count);
304    (summary, detail)
305}
306
307fn parse_queue_bind(args: &[u8]) -> (String, String) {
308    // [reserved:2][queue:short_str][exchange:short_str][routing_key:short_str]...
309    if args.len() < 3 { return ("Queue.Bind".into(), "Queue.Bind".into()); }
310    let rest = &args[2..];
311    let (queue, c1) = read_short_str(rest).unwrap_or_default();
312    let rest = &rest[c1..];
313    let (exchange, c2) = read_short_str(rest).unwrap_or_default();
314    let rest = &rest[c2..];
315    let (routing_key, _) = read_short_str(rest).unwrap_or_default();
316    let summary = format!("Queue.Bind {} → {} ({})", queue, exchange, routing_key);
317    let detail = format!("Queue.Bind queue={} exchange={} routing_key={}", queue, exchange, routing_key);
318    (detail, summary)
319}
320
321fn parse_basic_publish(args: &[u8]) -> (String, String) {
322    // [reserved:2][exchange:short_str][routing_key:short_str][flags:1]
323    if args.len() < 3 { return ("Basic.Publish".into(), "Basic.Publish".into()); }
324    let rest = &args[2..];
325    let (exchange, c1) = read_short_str(rest).unwrap_or_default();
326    let rest = &rest[c1..];
327    let (routing_key, _) = read_short_str(rest).unwrap_or_default();
328    let ex = if exchange.is_empty() { "(default)" } else { &exchange };
329    let summary = format!("Basic.Publish → {} key={}", ex, routing_key);
330    let detail = format!("Basic.Publish exchange={} routing_key={}", ex, routing_key);
331    (summary, detail)
332}
333
334fn parse_basic_deliver(args: &[u8]) -> (String, String) {
335    // [consumer_tag:short_str][delivery_tag:8][redelivered:1][exchange:short_str][routing_key:short_str]
336    let (consumer_tag, c1) = read_short_str(args).unwrap_or_default();
337    let rest = &args[c1..];
338    if rest.len() < 9 { return ("Basic.Deliver".into(), "Basic.Deliver".into()); }
339    let delivery_tag = u64::from_be_bytes([rest[0], rest[1], rest[2], rest[3], rest[4], rest[5], rest[6], rest[7]]);
340    let rest = &rest[9..]; // skip delivery_tag + redelivered
341    let (exchange, c2) = read_short_str(rest).unwrap_or_default();
342    let rest = &rest[c2..];
343    let (routing_key, _) = read_short_str(rest).unwrap_or_default();
344    let ex = if exchange.is_empty() { "(default)" } else { &exchange };
345    let summary = format!("Basic.Deliver ← {} key={}", ex, routing_key);
346    let detail = format!("Basic.Deliver consumer={} delivery_tag={} exchange={} routing_key={}", consumer_tag, delivery_tag, exchange, routing_key);
347    (summary, detail)
348}
349
350fn parse_basic_consume(args: &[u8]) -> (String, String) {
351    // [reserved:2][queue:short_str][consumer_tag:short_str]...
352    if args.len() < 3 { return ("Basic.Consume".into(), "Basic.Consume".into()); }
353    let rest = &args[2..];
354    let (queue, c1) = read_short_str(rest).unwrap_or_default();
355    let rest = &rest[c1..];
356    let (consumer_tag, _) = read_short_str(rest).unwrap_or_default();
357    let summary = format!("Basic.Consume {}", queue);
358    let detail = format!("Basic.Consume queue={} consumer_tag={}", queue, consumer_tag);
359    (summary, detail)
360}
361
362fn parse_basic_get(args: &[u8]) -> String {
363    // [reserved:2][queue:short_str][no_ack:1]
364    if args.len() < 3 { return "Basic.Get".into(); }
365    let rest = &args[2..];
366    let (queue, _) = read_short_str(rest).unwrap_or_default();
367    format!("Basic.Get queue={}", queue)
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[test]
375    fn test_parse_protocol_header() {
376        let buf = b"AMQP\x00\x00\x09\x01";
377        let frame = parse_amqp_frame(buf).unwrap();
378        assert_eq!(frame.method.unwrap().summary, "AMQP Protocol Header");
379    }
380
381    #[test]
382    fn test_parse_heartbeat() {
383        // Heartbeat frame: type=8, channel=0, size=0, frame_end=0xCE
384        let buf = [8, 0, 0, 0, 0, 0, 0, 0xCE];
385        let frame = parse_amqp_frame(&buf).unwrap();
386        assert_eq!(frame.frame_type, FRAME_HEARTBEAT);
387        assert_eq!(frame.method.unwrap().summary, "Heartbeat");
388    }
389
390    #[test]
391    fn test_parse_basic_publish_frame() {
392        // Method frame: type=1, channel=1, class=60, method=40
393        // args: reserved(2) + exchange "test" + routing_key "rk"
394        let mut buf = Vec::new();
395        buf.push(1); // type
396        buf.extend_from_slice(&1u16.to_be_bytes()); // channel
397        let args: Vec<u8> = vec![
398            0, 0, // reserved
399            4, b't', b'e', b's', b't', // exchange
400            2, b'r', b'k', // routing_key
401            0, // flags
402        ];
403        let payload_len = 4 + args.len();
404        buf.extend_from_slice(&(payload_len as u32).to_be_bytes()); // size
405        buf.extend_from_slice(&60u16.to_be_bytes()); // class
406        buf.extend_from_slice(&40u16.to_be_bytes()); // method
407        buf.extend_from_slice(&args);
408        buf.push(0xCE);
409
410        let frame = parse_amqp_frame(&buf).unwrap();
411        let method = frame.method.unwrap();
412        assert!(method.summary.contains("Basic.Publish"));
413        assert!(method.summary.contains("test"));
414        assert!(method.summary.contains("rk"));
415    }
416}