1const FRAME_METHOD: u8 = 1;
7const FRAME_HEADER: u8 = 2;
8const FRAME_BODY: u8 = 3;
9const FRAME_HEARTBEAT: u8 = 8;
10const FRAME_END: u8 = 0xCE;
11
12#[derive(Debug, Clone)]
13pub struct AmqpFrame {
14 pub frame_type: u8,
15 pub channel: u16,
16 pub method: Option<AmqpMethod>,
17 pub body: Option<Vec<u8>>,
18}
19
20#[derive(Debug, Clone)]
21pub struct AmqpMethod {
22 pub class_id: u16,
23 pub method_id: u16,
24 pub summary: String,
25 pub detail: String,
26}
27
28pub fn parse_amqp_frame(buf: &[u8]) -> Option<AmqpFrame> {
30 if buf.len() >= 8 && &buf[0..4] == b"AMQP" {
32 return Some(AmqpFrame {
33 frame_type: 0,
34 channel: 0,
35 method: Some(AmqpMethod {
36 class_id: 0,
37 method_id: 0,
38 summary: "AMQP Protocol Header".into(),
39 detail: format!("AMQP {}.{}.{}.{}", buf[4], buf[5], buf[6], buf[7]),
40 }),
41 body: None,
42 });
43 }
44
45 if buf.len() < 8 { return None; }
46 let frame_type = buf[0];
47 let channel = u16::from_be_bytes([buf[1], buf[2]]);
48 let size = u32::from_be_bytes([buf[3], buf[4], buf[5], buf[6]]) as usize;
49 let total = 7 + size + 1;
50 if buf.len() < total { return None; }
51 if buf[total - 1] != FRAME_END { return None; }
52
53 let payload = &buf[7..7 + size];
54
55 match frame_type {
56 FRAME_METHOD => {
57 if size < 4 { return None; }
58 let class_id = u16::from_be_bytes([payload[0], payload[1]]);
59 let method_id = u16::from_be_bytes([payload[2], payload[3]]);
60 let args = &payload[4..];
61 let (summary, detail) = decode_method(class_id, method_id, args);
62 Some(AmqpFrame {
63 frame_type,
64 channel,
65 method: Some(AmqpMethod { class_id, method_id, summary, detail }),
66 body: None,
67 })
68 }
69 FRAME_HEADER => Some(AmqpFrame {
70 frame_type, channel, method: None,
71 body: Some(payload.to_vec()),
72 }),
73 FRAME_BODY => Some(AmqpFrame {
74 frame_type, channel, method: None,
75 body: Some(payload.to_vec()),
76 }),
77 FRAME_HEARTBEAT => Some(AmqpFrame {
78 frame_type, channel,
79 method: Some(AmqpMethod {
80 class_id: 0, method_id: 0,
81 summary: "Heartbeat".into(),
82 detail: "Heartbeat".into(),
83 }),
84 body: None,
85 }),
86 _ => None,
87 }
88}
89
90pub fn frame_len(buf: &[u8]) -> Option<usize> {
92 if buf.len() >= 8 && &buf[0..4] == b"AMQP" {
93 return Some(8);
94 }
95 if buf.len() < 7 { return None; }
96 let size = u32::from_be_bytes([buf[3], buf[4], buf[5], buf[6]]) as usize;
97 let total = 7 + size + 1;
98 if buf.len() < total { return None; }
99 Some(total)
100}
101
102pub fn is_async_method(class_id: u16, method_id: u16) -> bool {
105 matches!((class_id, method_id),
106 (60, 40) | (60, 50) | (60, 60) | (60, 80) | (60, 90) | (60, 120))
107}
108
109pub fn parse_amqp_request_full(buf: &[u8]) -> Option<(String, String)> {
112 let frame = parse_amqp_frame(buf)?;
113 let method = frame.method.as_ref()?;
114 let summary = method.summary.clone();
115 let mut detail = method.detail.clone();
116
117 if let Some(first_len) = frame_len(buf) {
119 let mut pos = first_len;
120 while let Some(flen) = frame_len(&buf[pos..]) {
121 if let Some(f) = parse_amqp_frame(&buf[pos..]) {
122 if f.frame_type == FRAME_BODY {
123 if let Some(body) = &f.body {
124 let text = String::from_utf8_lossy(body);
125 detail = format!("{}\nBody: {}", detail, text);
126 }
127 }
128 }
129 pos += flen;
130 }
131 }
132
133 Some((summary, detail))
134}
135
136pub fn parse_amqp_request(buf: &[u8]) -> Option<String> {
138 let frame = parse_amqp_frame(buf)?;
139 frame.method.map(|m| m.summary)
140}
141
142pub fn parse_amqp_response(buf: &[u8]) -> Option<String> {
144 let frame = parse_amqp_frame(buf)?;
145 match frame.frame_type {
146 FRAME_BODY => {
147 let body = frame.body.unwrap_or_default();
148 let text = String::from_utf8_lossy(&body);
149 let truncated: String = text.chars().take(80).collect();
150 Some(format!("Body: {}", truncated))
151 }
152 _ => frame.method.map(|m| m.summary),
153 }
154}
155
156pub fn format_amqp_response_detail(buf: &[u8]) -> Option<String> {
158 let frame = parse_amqp_frame(buf)?;
159 match frame.frame_type {
160 FRAME_BODY => {
161 let body = frame.body.unwrap_or_default();
162 Some(String::from_utf8_lossy(&body).to_string())
163 }
164 _ => frame.method.map(|m| m.detail),
165 }
166}
167
168fn read_short_str(buf: &[u8]) -> Option<(String, usize)> {
170 if buf.is_empty() { return None; }
171 let len = buf[0] as usize;
172 if buf.len() < 1 + len { return None; }
173 Some((String::from_utf8_lossy(&buf[1..1 + len]).to_string(), 1 + len))
174}
175
176fn decode_method(class_id: u16, method_id: u16, args: &[u8]) -> (String, String) {
177 match (class_id, method_id) {
178 (10, 10) => ("Connection.Start".into(), "Connection.Start".into()),
180 (10, 11) => ("Connection.StartOk".into(), "Connection.StartOk".into()),
181 (10, 30) => ("Connection.Tune".into(), format_tune(args)),
182 (10, 31) => ("Connection.TuneOk".into(), format_tune(args)),
183 (10, 40) => {
184 let vhost = read_short_str(args).map(|(s, _)| s).unwrap_or_default();
185 (format!("Connection.Open vhost={}", vhost), format!("Connection.Open vhost={}", vhost))
186 }
187 (10, 41) => ("Connection.OpenOk".into(), "Connection.OpenOk".into()),
188 (10, 50) => ("Connection.Close".into(), format_close(args)),
189 (10, 51) => ("Connection.CloseOk".into(), "Connection.CloseOk".into()),
190 (20, 10) => ("Channel.Open".into(), "Channel.Open".into()),
192 (20, 11) => ("Channel.OpenOk".into(), "Channel.OpenOk".into()),
193 (20, 40) => ("Channel.Close".into(), format_close(args)),
194 (20, 41) => ("Channel.CloseOk".into(), "Channel.CloseOk".into()),
195 (40, 10) => {
197 let (exchange, detail) = parse_exchange_declare(args);
198 (format!("Exchange.Declare {}", exchange), detail)
199 }
200 (40, 11) => ("Exchange.DeclareOk".into(), "Exchange.DeclareOk".into()),
201 (40, 20) => ("Exchange.Delete".into(), "Exchange.Delete".into()),
202 (40, 21) => ("Exchange.DeleteOk".into(), "Exchange.DeleteOk".into()),
203 (50, 10) => {
205 let (queue, detail) = parse_queue_declare(args);
206 (format!("Queue.Declare {}", queue), detail)
207 }
208 (50, 11) => parse_queue_declare_ok(args),
209 (50, 20) => {
210 let (detail_str, summary) = parse_queue_bind(args);
211 (summary, detail_str)
212 }
213 (50, 21) => ("Queue.BindOk".into(), "Queue.BindOk".into()),
214 (50, 30) => ("Queue.Purge".into(), "Queue.Purge".into()),
215 (50, 31) => ("Queue.PurgeOk".into(), "Queue.PurgeOk".into()),
216 (50, 40) => ("Queue.Delete".into(), "Queue.Delete".into()),
217 (50, 41) => ("Queue.DeleteOk".into(), "Queue.DeleteOk".into()),
218 (60, 10) => ("Basic.Qos".into(), "Basic.Qos".into()),
220 (60, 11) => ("Basic.QosOk".into(), "Basic.QosOk".into()),
221 (60, 20) => {
222 let (summary, detail) = parse_basic_consume(args);
223 (summary, detail)
224 }
225 (60, 21) => ("Basic.ConsumeOk".into(), "Basic.ConsumeOk".into()),
226 (60, 30) => ("Basic.Cancel".into(), "Basic.Cancel".into()),
227 (60, 31) => ("Basic.CancelOk".into(), "Basic.CancelOk".into()),
228 (60, 40) => {
229 let (summary, detail) = parse_basic_publish(args);
230 (summary, detail)
231 }
232 (60, 50) => ("Basic.Return".into(), "Basic.Return".into()),
233 (60, 60) => {
234 let (summary, detail) = parse_basic_deliver(args);
235 (summary, detail)
236 }
237 (60, 70) => ("Basic.Get".into(), parse_basic_get(args)),
238 (60, 71) => ("Basic.GetOk".into(), "Basic.GetOk".into()),
239 (60, 72) => ("Basic.GetEmpty".into(), "Basic.GetEmpty".into()),
240 (60, 80) => ("Basic.Ack".into(), "Basic.Ack".into()),
241 (60, 90) => ("Basic.Reject".into(), "Basic.Reject".into()),
242 (60, 120) => ("Basic.Nack".into(), "Basic.Nack".into()),
243 (85, 10) => ("Confirm.Select".into(), "Confirm.Select".into()),
245 (85, 11) => ("Confirm.SelectOk".into(), "Confirm.SelectOk".into()),
246 (90, 10) => ("Tx.Select".into(), "Tx.Select".into()),
248 (90, 11) => ("Tx.SelectOk".into(), "Tx.SelectOk".into()),
249 (90, 20) => ("Tx.Commit".into(), "Tx.Commit".into()),
250 (90, 21) => ("Tx.CommitOk".into(), "Tx.CommitOk".into()),
251 (90, 30) => ("Tx.Rollback".into(), "Tx.Rollback".into()),
252 (90, 31) => ("Tx.RollbackOk".into(), "Tx.RollbackOk".into()),
253 _ => {
254 let s = format!("Method({}.{})", class_id, method_id);
255 (s.clone(), s)
256 }
257 }
258}
259
260fn format_tune(args: &[u8]) -> String {
261 if args.len() < 8 { return "Connection.Tune".into(); }
262 let channel_max = u16::from_be_bytes([args[0], args[1]]);
263 let frame_max = u32::from_be_bytes([args[2], args[3], args[4], args[5]]);
264 let heartbeat = u16::from_be_bytes([args[6], args[7]]);
265 format!("channel_max={} frame_max={} heartbeat={}", channel_max, frame_max, heartbeat)
266}
267
268fn format_close(args: &[u8]) -> String {
269 if args.len() < 4 { return "Close".into(); }
270 let code = u16::from_be_bytes([args[0], args[1]]);
271 let reason = read_short_str(&args[2..]).map(|(s, _)| s).unwrap_or_default();
272 format!("code={} reason={}", code, reason)
273}
274
275fn parse_exchange_declare(args: &[u8]) -> (String, String) {
276 if args.len() < 3 { return (String::new(), "Exchange.Declare".into()); }
278 let rest = &args[2..]; let (exchange, consumed) = read_short_str(rest).unwrap_or_default();
280 let rest = &rest[consumed..];
281 let (ex_type, _) = read_short_str(rest).unwrap_or_default();
282 let detail = format!("Exchange.Declare exchange={} type={}", exchange, ex_type);
283 (exchange, detail)
284}
285
286fn parse_queue_declare(args: &[u8]) -> (String, String) {
287 if args.len() < 3 { return (String::new(), "Queue.Declare".into()); }
289 let rest = &args[2..];
290 let (queue, _) = read_short_str(rest).unwrap_or_default();
291 let detail = format!("Queue.Declare queue={}", queue);
292 (queue, detail)
293}
294
295fn parse_queue_declare_ok(args: &[u8]) -> (String, String) {
296 let (queue, consumed) = read_short_str(args).unwrap_or_default();
297 let rest = &args[consumed..];
298 let (msg_count, consumer_count) = if rest.len() >= 8 {
299 (u32::from_be_bytes([rest[0], rest[1], rest[2], rest[3]]),
300 u32::from_be_bytes([rest[4], rest[5], rest[6], rest[7]]))
301 } else { (0, 0) };
302 let summary = format!("Queue.DeclareOk {}", queue);
303 let detail = format!("Queue.DeclareOk queue={} messages={} consumers={}", queue, msg_count, consumer_count);
304 (summary, detail)
305}
306
307fn parse_queue_bind(args: &[u8]) -> (String, String) {
308 if args.len() < 3 { return ("Queue.Bind".into(), "Queue.Bind".into()); }
310 let rest = &args[2..];
311 let (queue, c1) = read_short_str(rest).unwrap_or_default();
312 let rest = &rest[c1..];
313 let (exchange, c2) = read_short_str(rest).unwrap_or_default();
314 let rest = &rest[c2..];
315 let (routing_key, _) = read_short_str(rest).unwrap_or_default();
316 let summary = format!("Queue.Bind {} → {} ({})", queue, exchange, routing_key);
317 let detail = format!("Queue.Bind queue={} exchange={} routing_key={}", queue, exchange, routing_key);
318 (detail, summary)
319}
320
321fn parse_basic_publish(args: &[u8]) -> (String, String) {
322 if args.len() < 3 { return ("Basic.Publish".into(), "Basic.Publish".into()); }
324 let rest = &args[2..];
325 let (exchange, c1) = read_short_str(rest).unwrap_or_default();
326 let rest = &rest[c1..];
327 let (routing_key, _) = read_short_str(rest).unwrap_or_default();
328 let ex = if exchange.is_empty() { "(default)" } else { &exchange };
329 let summary = format!("Basic.Publish → {} key={}", ex, routing_key);
330 let detail = format!("Basic.Publish exchange={} routing_key={}", ex, routing_key);
331 (summary, detail)
332}
333
334fn parse_basic_deliver(args: &[u8]) -> (String, String) {
335 let (consumer_tag, c1) = read_short_str(args).unwrap_or_default();
337 let rest = &args[c1..];
338 if rest.len() < 9 { return ("Basic.Deliver".into(), "Basic.Deliver".into()); }
339 let delivery_tag = u64::from_be_bytes([rest[0], rest[1], rest[2], rest[3], rest[4], rest[5], rest[6], rest[7]]);
340 let rest = &rest[9..]; let (exchange, c2) = read_short_str(rest).unwrap_or_default();
342 let rest = &rest[c2..];
343 let (routing_key, _) = read_short_str(rest).unwrap_or_default();
344 let ex = if exchange.is_empty() { "(default)" } else { &exchange };
345 let summary = format!("Basic.Deliver ← {} key={}", ex, routing_key);
346 let detail = format!("Basic.Deliver consumer={} delivery_tag={} exchange={} routing_key={}", consumer_tag, delivery_tag, exchange, routing_key);
347 (summary, detail)
348}
349
350fn parse_basic_consume(args: &[u8]) -> (String, String) {
351 if args.len() < 3 { return ("Basic.Consume".into(), "Basic.Consume".into()); }
353 let rest = &args[2..];
354 let (queue, c1) = read_short_str(rest).unwrap_or_default();
355 let rest = &rest[c1..];
356 let (consumer_tag, _) = read_short_str(rest).unwrap_or_default();
357 let summary = format!("Basic.Consume {}", queue);
358 let detail = format!("Basic.Consume queue={} consumer_tag={}", queue, consumer_tag);
359 (summary, detail)
360}
361
362fn parse_basic_get(args: &[u8]) -> String {
363 if args.len() < 3 { return "Basic.Get".into(); }
365 let rest = &args[2..];
366 let (queue, _) = read_short_str(rest).unwrap_or_default();
367 format!("Basic.Get queue={}", queue)
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 #[test]
375 fn test_parse_protocol_header() {
376 let buf = b"AMQP\x00\x00\x09\x01";
377 let frame = parse_amqp_frame(buf).unwrap();
378 assert_eq!(frame.method.unwrap().summary, "AMQP Protocol Header");
379 }
380
381 #[test]
382 fn test_parse_heartbeat() {
383 let buf = [8, 0, 0, 0, 0, 0, 0, 0xCE];
385 let frame = parse_amqp_frame(&buf).unwrap();
386 assert_eq!(frame.frame_type, FRAME_HEARTBEAT);
387 assert_eq!(frame.method.unwrap().summary, "Heartbeat");
388 }
389
390 #[test]
391 fn test_parse_basic_publish_frame() {
392 let mut buf = Vec::new();
395 buf.push(1); buf.extend_from_slice(&1u16.to_be_bytes()); let args: Vec<u8> = vec![
398 0, 0, 4, b't', b'e', b's', b't', 2, b'r', b'k', 0, ];
403 let payload_len = 4 + args.len();
404 buf.extend_from_slice(&(payload_len as u32).to_be_bytes()); buf.extend_from_slice(&60u16.to_be_bytes()); buf.extend_from_slice(&40u16.to_be_bytes()); buf.extend_from_slice(&args);
408 buf.push(0xCE);
409
410 let frame = parse_amqp_frame(&buf).unwrap();
411 let method = frame.method.unwrap();
412 assert!(method.summary.contains("Basic.Publish"));
413 assert!(method.summary.contains("test"));
414 assert!(method.summary.contains("rk"));
415 }
416}