1use crate::backend::{BackendTag, HandleOpaque};
63use crate::engine_error::{EngineError, ValidationKind};
64use crate::types::{
65 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, WorkerInstanceId,
66};
67
68const V2_MAGIC: u8 = 0x02;
71const V2_WIRE_VERSION: u8 = 0x01;
73const V1_VERSION_TAG: u8 = 0x01;
75
76#[derive(Clone, Debug, PartialEq, Eq)]
83#[non_exhaustive]
84pub struct HandlePayload {
85 pub execution_id: ExecutionId,
86 pub attempt_index: AttemptIndex,
87 pub attempt_id: AttemptId,
88 pub lease_id: LeaseId,
89 pub lease_epoch: LeaseEpoch,
90 pub lease_ttl_ms: u64,
91 pub lane_id: LaneId,
92 pub worker_instance_id: WorkerInstanceId,
93}
94
95impl HandlePayload {
96 #[allow(clippy::too_many_arguments)] pub fn new(
100 execution_id: ExecutionId,
101 attempt_index: AttemptIndex,
102 attempt_id: AttemptId,
103 lease_id: LeaseId,
104 lease_epoch: LeaseEpoch,
105 lease_ttl_ms: u64,
106 lane_id: LaneId,
107 worker_instance_id: WorkerInstanceId,
108 ) -> Self {
109 Self {
110 execution_id,
111 attempt_index,
112 attempt_id,
113 lease_id,
114 lease_epoch,
115 lease_ttl_ms,
116 lane_id,
117 worker_instance_id,
118 }
119 }
120}
121
122#[derive(Clone, Debug, PartialEq, Eq)]
126#[non_exhaustive]
127pub struct DecodedHandle {
128 pub tag: BackendTag,
129 pub payload: HandlePayload,
130}
131
132impl DecodedHandle {
133 pub fn new(tag: BackendTag, payload: HandlePayload) -> Self {
134 Self { tag, payload }
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
142#[non_exhaustive]
143pub enum HandleDecodeError {
144 Truncated { needed: usize, at: usize, have: usize },
146 TrailingBytes { pos: usize, len: usize },
148 BadWireVersion { got: u8 },
150 BadV1Version { got: u8 },
152 BadBackendTag { got: u8 },
154 InvalidUtf8 { field: &'static str },
156 ParseField { field: &'static str, detail: String },
158}
159
160impl std::fmt::Display for HandleDecodeError {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 HandleDecodeError::Truncated { needed, at, have } => write!(
164 f,
165 "truncated handle: needed {needed} bytes at offset {at}, have {have}"
166 ),
167 HandleDecodeError::TrailingBytes { pos, len } => {
168 write!(f, "trailing bytes in handle: pos={pos}, len={len}")
169 }
170 HandleDecodeError::BadWireVersion { got } => {
171 write!(f, "handle v2 wire-version byte {got:#x} not recognised")
172 }
173 HandleDecodeError::BadV1Version { got } => write!(
174 f,
175 "handle v1 version byte {got:#x} not recognised (expected {V1_VERSION_TAG:#x})"
176 ),
177 HandleDecodeError::BadBackendTag { got } => {
178 write!(f, "handle backend-tag byte {got:#x} not recognised")
179 }
180 HandleDecodeError::InvalidUtf8 { field } => {
181 write!(f, "handle field `{field}` is not valid UTF-8")
182 }
183 HandleDecodeError::ParseField { field, detail } => {
184 write!(f, "handle field `{field}` parse failed: {detail}")
185 }
186 }
187 }
188}
189
190impl std::error::Error for HandleDecodeError {}
191
192impl From<HandleDecodeError> for EngineError {
193 fn from(err: HandleDecodeError) -> Self {
194 EngineError::Validation {
195 kind: ValidationKind::Corruption,
196 detail: format!("handle_codec: {err}"),
197 }
198 }
199}
200
201pub fn encode(tag: BackendTag, payload: &HandlePayload) -> HandleOpaque {
205 let mut buf: Vec<u8> = Vec::with_capacity(256);
206 buf.push(V2_MAGIC);
207 buf.push(V2_WIRE_VERSION);
208 buf.push(tag.wire_byte());
209 write_fields(&mut buf, payload);
210 HandleOpaque::new(buf.into_boxed_slice())
211}
212
213pub fn decode(opaque: &HandleOpaque) -> Result<DecodedHandle, HandleDecodeError> {
218 let bytes = opaque.as_bytes();
219 let mut cur = Cursor::new(bytes);
220 let lead = cur.read_u8()?;
221 let tag = match lead {
222 V2_MAGIC => {
223 let wire_version = cur.read_u8()?;
224 if wire_version != V2_WIRE_VERSION {
225 return Err(HandleDecodeError::BadWireVersion { got: wire_version });
226 }
227 let tag_byte = cur.read_u8()?;
228 BackendTag::from_wire_byte(tag_byte)
229 .ok_or(HandleDecodeError::BadBackendTag { got: tag_byte })?
230 }
231 V1_VERSION_TAG => BackendTag::Valkey,
232 other => return Err(HandleDecodeError::BadV1Version { got: other }),
233 };
234 let payload = read_fields(&mut cur)?;
235 cur.expect_eof()?;
236 Ok(DecodedHandle { tag, payload })
237}
238
239fn write_fields(buf: &mut Vec<u8>, f: &HandlePayload) {
242 write_str(buf, &f.execution_id.to_string());
243 buf.extend_from_slice(&f.attempt_index.0.to_le_bytes());
244 write_str(buf, &f.attempt_id.to_string());
245 write_str(buf, &f.lease_id.to_string());
246 buf.extend_from_slice(&f.lease_epoch.0.to_le_bytes());
247 buf.extend_from_slice(&f.lease_ttl_ms.to_le_bytes());
248 write_str(buf, f.lane_id.as_str());
249 write_str(buf, f.worker_instance_id.as_str());
250}
251
252fn read_fields(cur: &mut Cursor<'_>) -> Result<HandlePayload, HandleDecodeError> {
253 let execution_id_str = cur.read_str("execution_id")?;
254 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
255 HandleDecodeError::ParseField {
256 field: "execution_id",
257 detail: e.to_string(),
258 }
259 })?;
260 let attempt_index = AttemptIndex::new(cur.read_u32()?);
261 let attempt_id_str = cur.read_str("attempt_id")?;
262 let attempt_id =
263 AttemptId::parse(&attempt_id_str).map_err(|e| HandleDecodeError::ParseField {
264 field: "attempt_id",
265 detail: e.to_string(),
266 })?;
267 let lease_id_str = cur.read_str("lease_id")?;
268 let lease_id = LeaseId::parse(&lease_id_str).map_err(|e| HandleDecodeError::ParseField {
269 field: "lease_id",
270 detail: e.to_string(),
271 })?;
272 let lease_epoch = LeaseEpoch(cur.read_u64()?);
273 let lease_ttl_ms = cur.read_u64()?;
274 let lane_id_str = cur.read_str("lane_id")?;
275 let lane_id = LaneId::new(lane_id_str);
276 let worker_str = cur.read_str("worker_instance_id")?;
277 let worker_instance_id = WorkerInstanceId::new(worker_str);
278 Ok(HandlePayload {
279 execution_id,
280 attempt_index,
281 attempt_id,
282 lease_id,
283 lease_epoch,
284 lease_ttl_ms,
285 lane_id,
286 worker_instance_id,
287 })
288}
289
290fn write_str(buf: &mut Vec<u8>, s: &str) {
291 let bytes = s.as_bytes();
292 let (len, take) = match u32::try_from(bytes.len()) {
297 Ok(n) => (n, bytes.len()),
298 Err(_) => (u32::MAX, u32::MAX as usize),
299 };
300 buf.extend_from_slice(&len.to_le_bytes());
301 buf.extend_from_slice(&bytes[..take]);
302}
303
304struct Cursor<'a> {
305 bytes: &'a [u8],
306 pos: usize,
307}
308
309impl<'a> Cursor<'a> {
310 fn new(bytes: &'a [u8]) -> Self {
311 Self { bytes, pos: 0 }
312 }
313
314 fn take(&mut self, n: usize) -> Result<&'a [u8], HandleDecodeError> {
315 if self.pos + n > self.bytes.len() {
316 return Err(HandleDecodeError::Truncated {
317 needed: n,
318 at: self.pos,
319 have: self.bytes.len(),
320 });
321 }
322 let slice = &self.bytes[self.pos..self.pos + n];
323 self.pos += n;
324 Ok(slice)
325 }
326
327 fn read_u8(&mut self) -> Result<u8, HandleDecodeError> {
328 Ok(self.take(1)?[0])
329 }
330
331 fn read_u32(&mut self) -> Result<u32, HandleDecodeError> {
332 let mut b = [0u8; 4];
333 b.copy_from_slice(self.take(4)?);
334 Ok(u32::from_le_bytes(b))
335 }
336
337 fn read_u64(&mut self) -> Result<u64, HandleDecodeError> {
338 let mut b = [0u8; 8];
339 b.copy_from_slice(self.take(8)?);
340 Ok(u64::from_le_bytes(b))
341 }
342
343 fn read_str(&mut self, field: &'static str) -> Result<String, HandleDecodeError> {
344 let len = self.read_u32()? as usize;
345 let bytes = self.take(len)?;
346 String::from_utf8(bytes.to_vec())
347 .map_err(|_| HandleDecodeError::InvalidUtf8 { field })
348 }
349
350 fn expect_eof(&self) -> Result<(), HandleDecodeError> {
351 if self.pos != self.bytes.len() {
352 return Err(HandleDecodeError::TrailingBytes {
353 pos: self.pos,
354 len: self.bytes.len(),
355 });
356 }
357 Ok(())
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use crate::partition::PartitionConfig;
365
366 fn sample_payload() -> HandlePayload {
367 HandlePayload {
368 execution_id: ExecutionId::solo(
369 &LaneId::new("default"),
370 &PartitionConfig::default(),
371 ),
372 attempt_index: AttemptIndex::new(3),
373 attempt_id: AttemptId::new(),
374 lease_id: LeaseId::new(),
375 lease_epoch: LeaseEpoch(7),
376 lease_ttl_ms: 30_000,
377 lane_id: LaneId::new("default"),
378 worker_instance_id: WorkerInstanceId::new("worker-1"),
379 }
380 }
381
382 #[test]
383 fn round_trip_valkey_v2() {
384 let p = sample_payload();
385 let opaque = encode(BackendTag::Valkey, &p);
386 assert_eq!(opaque.as_bytes()[0], V2_MAGIC);
388 assert_eq!(opaque.as_bytes()[1], V2_WIRE_VERSION);
389 assert_eq!(opaque.as_bytes()[2], BackendTag::Valkey.wire_byte());
390 let decoded = decode(&opaque).expect("round-trip");
391 assert_eq!(decoded.tag, BackendTag::Valkey);
392 assert_eq!(decoded.payload, p);
393 }
394
395 #[test]
396 fn round_trip_postgres_v2() {
397 let p = sample_payload();
398 let opaque = encode(BackendTag::Postgres, &p);
399 assert_eq!(opaque.as_bytes()[2], BackendTag::Postgres.wire_byte());
400 let decoded = decode(&opaque).expect("round-trip");
401 assert_eq!(decoded.tag, BackendTag::Postgres);
402 assert_eq!(decoded.payload, p);
403 }
404
405 #[test]
410 fn old_v1_format_decodes_as_valkey() {
411 let p = sample_payload();
412 let mut buf: Vec<u8> = Vec::new();
415 buf.push(V1_VERSION_TAG); write_fields(&mut buf, &p);
417 let opaque = HandleOpaque::new(buf.into_boxed_slice());
418 let decoded = decode(&opaque).expect("v1 compat decode");
419 assert_eq!(decoded.tag, BackendTag::Valkey);
420 assert_eq!(decoded.payload, p);
421 }
422
423 #[test]
424 fn truncated_handle_rejected() {
425 let opaque = HandleOpaque::new(Box::new([V2_MAGIC]));
427 let err = decode(&opaque).unwrap_err();
428 assert!(matches!(err, HandleDecodeError::Truncated { .. }));
429 }
430
431 #[test]
432 fn bad_v2_wire_version_rejected() {
433 let opaque = HandleOpaque::new(Box::new([V2_MAGIC, 0xFF, 0x01]));
435 let err = decode(&opaque).unwrap_err();
436 assert!(matches!(err, HandleDecodeError::BadWireVersion { got: 0xFF }));
437 }
438
439 #[test]
440 fn bad_backend_tag_rejected() {
441 let opaque = HandleOpaque::new(Box::new([V2_MAGIC, V2_WIRE_VERSION, 0xFE]));
443 let err = decode(&opaque).unwrap_err();
444 assert!(matches!(err, HandleDecodeError::BadBackendTag { got: 0xFE }));
445 }
446
447 #[test]
448 fn bad_leading_byte_rejected() {
449 let opaque = HandleOpaque::new(Box::new([0xAB]));
451 let err = decode(&opaque).unwrap_err();
452 assert!(matches!(err, HandleDecodeError::BadV1Version { got: 0xAB }));
453 }
454
455 #[test]
456 fn decode_error_maps_to_validation_corruption() {
457 let err: EngineError = HandleDecodeError::Truncated {
458 needed: 4,
459 at: 1,
460 have: 1,
461 }
462 .into();
463 match err {
464 EngineError::Validation { kind, detail } => {
465 assert_eq!(kind, ValidationKind::Corruption);
466 assert!(detail.contains("handle_codec"));
467 }
468 other => panic!("expected Validation, got {other:?}"),
469 }
470 }
471}