1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// SPDX-License-Identifier: Apache-2.0
// Chaos scenarios value a single readable step-by-step `fn`. Splitting
// these into sub-helpers would obscure the synthetic frame sequence the
// test pins. We accept the line count.
#![allow(clippy::too_many_lines)]
//! Chaos scenario: `ack_timeout` on a consumer's
//! [`UnackedMessageTracker`](magnetar_proto::trackers::unacked) must fire at
//! the configured virtual deadline — not the host wall-clock.
//!
//! Why this is moonpool territory: same as the send-timeout sibling
//! ([`virtual_clock_send_timeout`](crate::common)) — `testcontainers`
//! cannot drive a fast deterministic deadline; only synthetic [`Instant`]s
//! can pin the boundary condition.
//!
//! ## Shape
//!
//! 1. Subscribe a consumer with `ack_timeout = 10s`.
//! 2. Feed a synthetic broker `CommandMessage` + payload back to the state machine at virtual t0.
//! The unacked-tracker records it.
//! 3. Tick at `t0 + 9.9s` — no redelivery yet, tracker still holds the id.
//! 4. Tick at `t0 + 10.1s` — the proto layer emits a [`pb::CommandRedeliverUnacknowledgedMessages`]
//! frame on the outbound queue, addressed to the consumer + carrying the timed-out message id.
mod common;
use std::time::{Duration, Instant};
use bytes::{Bytes, BytesMut};
use magnetar_proto::{ConnectionConfig, SubscribeRequest, encode_command, encode_payload, pb};
use magnetar_runtime_moonpool::ConnectionShared;
use crate::common::handshake_response_bytes;
const ACK_TIMEOUT: Duration = Duration::from_secs(10);
#[test]
fn ack_timeout_fires_at_virtual_deadline() {
let t0 = Instant::now();
let shared = ConnectionShared::new(ConnectionConfig::default());
// Drive the handshake at virtual t0.
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let connected = handshake_response_bytes();
conn.handle_bytes(t0, &connected).expect("Connected");
let _ = conn.poll_event();
}
// Open a subscription with the ack-timeout knob set. Drain the
// outbound `CommandSubscribe` and the broker's `CommandSuccess` so the
// consumer is past the open round-trip.
let req = SubscribeRequest {
topic: "persistent://public/default/ack-timeout".to_owned(),
subscription: "magnetar-test-ack-timeout".to_owned(),
sub_type: pb::command_subscribe::SubType::Exclusive,
ack_timeout: Some(ACK_TIMEOUT),
..Default::default()
};
let (handle, subscribe_request_id) = {
let mut conn = shared.inner.lock();
let request_id = conn.peek_next_request_id_for_test();
let handle = conn.subscribe(req);
(handle, request_id)
};
{
// Ack the subscribe.
let success = pb::BaseCommand {
r#type: pb::base_command::Type::Success as i32,
success: Some(pb::CommandSuccess {
request_id: subscribe_request_id,
schema: None,
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &success).expect("encode CommandSuccess");
let mut conn = shared.inner.lock();
conn.handle_bytes(t0, &buf).expect("Success");
let _ = conn.poll_event();
}
// Feed a synthetic incoming message. The broker frame is
// `CommandMessage` followed by a `MessageMetadata`-prefixed payload.
let msg_cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Message as i32,
message: Some(pb::CommandMessage {
consumer_id: handle.0,
message_id: pb::MessageIdData {
ledger_id: 7,
entry_id: 3,
partition: None,
batch_index: None,
ack_set: vec![],
batch_size: None,
first_chunk_message_id: None,
},
redelivery_count: Some(0),
ack_set: vec![],
consumer_epoch: None,
}),
..Default::default()
};
let metadata = pb::MessageMetadata {
producer_name: "magnetar-test-prod".to_owned(),
sequence_id: 1,
publish_time: 0,
..Default::default()
};
let payload = b"unacked-payload";
let mut frame = BytesMut::new();
encode_payload(&mut frame, &msg_cmd, &metadata, payload).expect("encode message frame");
{
let mut conn = shared.inner.lock();
conn.handle_bytes(t0, &frame).expect("deliver message");
}
// The state machine emits a `ConnectionEvent::Message` and drains the
// consumer queue inline during `handle_bytes`. The unacked-tracker is
// populated at `deliver` time (not on pop), so by the time we reach
// here the timer is already armed for the synthetic message id.
let mut saw_msg = false;
{
let mut conn = shared.inner.lock();
while let Some(evt) = conn.poll_event() {
if let magnetar_proto::ConnectionEvent::Message { message, .. } = evt {
assert_eq!(message.payload, Bytes::from_static(payload));
saw_msg = true;
}
}
}
assert!(
saw_msg,
"expected a Message event for the delivered payload"
);
// Drain the outbound bytes the consumer state machine queued (initial
// `CommandFlow`, etc) so we can later observe the redeliver-unacked
// frame in isolation.
{
let mut conn = shared.inner.lock();
let _ = conn.poll_transmit();
}
// Tick before the deadline. Tracker still holds the id; no redeliver
// frame on the wire.
{
let mut conn = shared.inner.lock();
conn.handle_timeout(t0 + Duration::from_millis(9_900));
let tx = conn.poll_transmit();
assert!(
tx.is_empty(),
"no redeliver-unacked frame should be queued before the virtual deadline"
);
}
// Tick after the deadline. The state machine emits a
// `CommandRedeliverUnacknowledgedMessages` frame with the timed-out id.
{
let mut conn = shared.inner.lock();
conn.handle_timeout(t0 + Duration::from_millis(10_500));
let mut src = conn.poll_transmit();
assert!(
!src.is_empty(),
"ack-timeout sweep at virtual deadline must queue a redeliver-unacked frame"
);
// Decode the queued frame to confirm it is the expected
// `CommandRedeliverUnacknowledgedMessages` for the right consumer.
let frame = magnetar_proto::decode_one(&mut src).expect("decode redeliver frame");
assert_eq!(
frame.command.r#type,
pb::base_command::Type::RedeliverUnacknowledgedMessages as i32,
"expected RedeliverUnacknowledgedMessages, got {:?}",
frame.command.r#type,
);
let redeliver = frame
.command
.redeliver_unacknowledged_messages
.expect("redeliver body present");
assert_eq!(redeliver.consumer_id, handle.0);
assert_eq!(
redeliver.message_ids.len(),
1,
"expected one timed-out message id, got {:?}",
redeliver.message_ids,
);
assert_eq!(redeliver.message_ids[0].ledger_id, 7);
assert_eq!(redeliver.message_ids[0].entry_id, 3);
}
}