#![cfg(test)]
use super::RespValue;
const REDIS_RESP3_PUSH_INTERLEAVING_RCH_COMMAND: &str = "rch exec -- env CARGO_TARGET_DIR=${TMPDIR:-/tmp}/rch_target_pane6_grpc_redis_resp3_push_interleaving_audit cargo test -p asupersync --lib redis_resp3_push_interleaving --features test-internals -- --nocapture";
const STALE_REDIS_RESP3_PUSH_INTERLEAVING_RCH_COMMAND: &str = "rch exec -- cargo test -p asupersync --lib redis_resp3_push_interleaving --features test-internals -- --nocapture";
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn frame_kind(value: &RespValue) -> &'static str {
match value {
RespValue::SimpleString(_) => "simple-string",
RespValue::Error(_) => "error",
RespValue::Integer(_) => "integer",
RespValue::BulkString(_) => "bulk-string",
RespValue::Array(_) => "array",
RespValue::Null => "null",
RespValue::Boolean(_) => "boolean",
RespValue::Double(_) => "double",
RespValue::BigNumber(_) => "big-number",
RespValue::Verbatim { .. } => "verbatim",
RespValue::BlobError(_) => "blob-error",
RespValue::Map(_) => "map",
RespValue::Set(_) => "set",
RespValue::Push(_) => "push",
RespValue::Attribute(_) => "attribute",
}
}
fn buffer_fingerprint(bytes: &[u8]) -> String {
let checksum = bytes
.iter()
.fold(0_u32, |acc, byte| acc.rotate_left(5) ^ u32::from(*byte));
format!("len{}-crc{:08x}", bytes.len(), checksum)
}
#[test]
fn audit_resp3_push_frame_interleaving_behavior() {
init_test("audit_resp3_push_frame_interleaving_behavior");
let ping_response_complete = b":1\r\n"; let push_frame_complete = b">2\r\n$10\r\ninvalidate\r\n*1\r\n$3\r\nkey\r\n";
let mut interleaved_buffer = Vec::new();
interleaved_buffer.extend_from_slice(push_frame_complete);
interleaved_buffer.extend_from_slice(ping_response_complete);
let (first_decoded, first_consumed) = RespValue::try_decode(&interleaved_buffer)
.expect("interleaved buffer should decode")
.expect("should have complete frame");
assert!(REDIS_RESP3_PUSH_INTERLEAVING_RCH_COMMAND.contains("CARGO_TARGET_DIR="));
assert_ne!(
REDIS_RESP3_PUSH_INTERLEAVING_RCH_COMMAND,
STALE_REDIS_RESP3_PUSH_INTERLEAVING_RCH_COMMAND
);
tracing::info!(
test_name = "audit_resp3_push_frame_interleaving_behavior",
first_frame_kind = frame_kind(&first_decoded),
first_consumed,
buffer_fingerprint = %buffer_fingerprint(&interleaved_buffer),
exact_rch_command = REDIS_RESP3_PUSH_INTERLEAVING_RCH_COMMAND,
"redis RESP3 push interleaving first decode"
);
match first_decoded {
RespValue::Push(ref items) => {
assert_eq!(items.len(), 2);
match (&items[0], &items[1]) {
(RespValue::BulkString(Some(kind)), RespValue::Array(Some(keys))) => {
assert_eq!(kind, b"invalidate");
assert_eq!(keys.len(), 1);
}
_ => panic!("Unexpected push frame structure: {items:?}"),
}
}
other => panic!(
"Protocol level should still decode push frames first, got {}: {other:?}",
frame_kind(&other)
),
}
let remaining_buffer = &interleaved_buffer[first_consumed..];
assert_eq!(
remaining_buffer, b":1\r\n",
"PING response should follow push frame in buffer"
);
let (second_decoded, second_consumed) = RespValue::try_decode(remaining_buffer)
.expect("remaining buffer should decode")
.expect("should have complete PING frame");
tracing::info!(
test_name = "audit_resp3_push_frame_interleaving_behavior",
second_frame_kind = frame_kind(&second_decoded),
second_consumed,
remaining_fingerprint = %buffer_fingerprint(remaining_buffer),
"redis RESP3 push interleaving second decode"
);
assert_eq!(
second_decoded,
RespValue::Integer(1),
"PING response should decode correctly"
);
crate::test_complete!(
"audit_resp3_push_frame_interleaving_behavior",
first_consumed = first_consumed,
second_consumed = second_consumed,
downstream_frontier = "pending rch validation",
);
}
#[test]
fn audit_command_client_push_frame_isolation() {
init_test("audit_command_client_push_frame_isolation");
let commands_and_pushes: [&[u8]; 5] = [
b"+OK\r\n".as_slice(), b">2\r\n$10\r\ninvalidate\r\n*1\r\n$4\r\nkey1\r\n".as_slice(), b":42\r\n".as_slice(), b">3\r\n$10\r\nmonitoring\r\n+event\r\n:123\r\n".as_slice(), b"$5\r\nhello\r\n".as_slice(), ];
let mut combined_buffer = Vec::new();
for chunk in commands_and_pushes {
combined_buffer.extend_from_slice(chunk);
}
let mut pos = 0;
let mut responses = Vec::new();
let mut push_frames = Vec::new();
for frame_index in 0..commands_and_pushes.len() {
let (decoded, consumed) = RespValue::try_decode(&combined_buffer[pos..])
.expect("combined buffer should decode")
.expect("should have complete frame");
pos += consumed;
tracing::info!(
test_name = "audit_command_client_push_frame_isolation",
frame_index,
decoded_kind = frame_kind(&decoded),
consumed,
cursor = pos,
combined_fingerprint = %buffer_fingerprint(&combined_buffer),
"redis RESP3 command/push frame decode"
);
match decoded {
RespValue::Push(items) => push_frames.push(items),
other => responses.push(other),
}
}
assert_eq!(responses.len(), 3, "Should have 3 command responses");
assert_eq!(push_frames.len(), 2, "Should have 2 push frames");
assert_eq!(responses[0], RespValue::SimpleString("OK".to_string()));
assert_eq!(responses[1], RespValue::Integer(42));
assert_eq!(responses[2], RespValue::BulkString(Some(b"hello".to_vec())));
crate::test_complete!(
"audit_command_client_push_frame_isolation",
response_count = responses.len(),
push_count = push_frames.len(),
bytes_consumed = pos,
buffer_fingerprint = buffer_fingerprint(&combined_buffer),
);
}
#[test]
fn audit_reference_push_frame_buffering_pattern() {
init_test("audit_reference_push_frame_buffering_pattern");
tracing::info!(
test_name = "audit_reference_push_frame_buffering_pattern",
expected_response_policy = "skip-or-buffer-push-then-continue",
artifact = "in-process parser harness",
"redis RESP3 reference buffering pattern"
);
crate::test_complete!(
"audit_reference_push_frame_buffering_pattern",
response_policy = "skip-or-buffer-push-then-continue",
artifact = "in-process parser harness",
);
}