1use crate::backend::{BackendTag, HandleOpaque};
63use crate::engine_error::{EngineError, ValidationKind};
64use crate::types::{
65 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, WorkerInstanceId,
66};
67
68const V2_MAGIC: u8 = 0x02;
71const V2_WIRE_VERSION: u8 = 0x01;
73const V1_VERSION_TAG: u8 = 0x01;
75
76#[derive(Clone, Debug, PartialEq, Eq)]
83#[non_exhaustive]
84pub struct HandlePayload {
85 pub execution_id: ExecutionId,
86 pub attempt_index: AttemptIndex,
87 pub attempt_id: AttemptId,
88 pub lease_id: LeaseId,
89 pub lease_epoch: LeaseEpoch,
90 pub lease_ttl_ms: u64,
91 pub lane_id: LaneId,
92 pub worker_instance_id: WorkerInstanceId,
93}
94
95impl HandlePayload {
96 #[allow(clippy::too_many_arguments)] pub fn new(
100 execution_id: ExecutionId,
101 attempt_index: AttemptIndex,
102 attempt_id: AttemptId,
103 lease_id: LeaseId,
104 lease_epoch: LeaseEpoch,
105 lease_ttl_ms: u64,
106 lane_id: LaneId,
107 worker_instance_id: WorkerInstanceId,
108 ) -> Self {
109 Self {
110 execution_id,
111 attempt_index,
112 attempt_id,
113 lease_id,
114 lease_epoch,
115 lease_ttl_ms,
116 lane_id,
117 worker_instance_id,
118 }
119 }
120}
121
122#[derive(Clone, Debug, PartialEq, Eq)]
126#[non_exhaustive]
127pub struct DecodedHandle {
128 pub tag: BackendTag,
129 pub payload: HandlePayload,
130}
131
132impl DecodedHandle {
133 pub fn new(tag: BackendTag, payload: HandlePayload) -> Self {
134 Self { tag, payload }
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
142#[non_exhaustive]
143pub enum HandleDecodeError {
144 Truncated { needed: usize, at: usize, have: usize },
146 TrailingBytes { pos: usize, len: usize },
148 BadWireVersion { got: u8 },
150 BadV1Version { got: u8 },
152 BadBackendTag { got: u8 },
154 InvalidUtf8 { field: &'static str },
156 ParseField { field: &'static str, detail: String },
158}
159
160impl std::fmt::Display for HandleDecodeError {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 HandleDecodeError::Truncated { needed, at, have } => write!(
164 f,
165 "truncated handle: needed {needed} bytes at offset {at}, have {have}"
166 ),
167 HandleDecodeError::TrailingBytes { pos, len } => {
168 write!(f, "trailing bytes in handle: pos={pos}, len={len}")
169 }
170 HandleDecodeError::BadWireVersion { got } => {
171 write!(f, "handle v2 wire-version byte {got:#x} not recognised")
172 }
173 HandleDecodeError::BadV1Version { got } => write!(
174 f,
175 "handle v1 version byte {got:#x} not recognised (expected {V1_VERSION_TAG:#x})"
176 ),
177 HandleDecodeError::BadBackendTag { got } => {
178 write!(f, "handle backend-tag byte {got:#x} not recognised")
179 }
180 HandleDecodeError::InvalidUtf8 { field } => {
181 write!(f, "handle field `{field}` is not valid UTF-8")
182 }
183 HandleDecodeError::ParseField { field, detail } => {
184 write!(f, "handle field `{field}` parse failed: {detail}")
185 }
186 }
187 }
188}
189
190impl std::error::Error for HandleDecodeError {}
191
192impl From<HandleDecodeError> for EngineError {
193 fn from(err: HandleDecodeError) -> Self {
194 EngineError::Validation {
195 kind: ValidationKind::Corruption,
196 detail: format!("handle_codec: {err}"),
197 }
198 }
199}
200
201pub fn encode(tag: BackendTag, payload: &HandlePayload) -> HandleOpaque {
205 let mut buf: Vec<u8> = Vec::with_capacity(256);
206 buf.push(V2_MAGIC);
207 buf.push(V2_WIRE_VERSION);
208 buf.push(tag.wire_byte());
209 write_fields(&mut buf, payload);
210 HandleOpaque::new(buf.into_boxed_slice())
211}
212
213pub fn decode(opaque: &HandleOpaque) -> Result<DecodedHandle, HandleDecodeError> {
218 let bytes = opaque.as_bytes();
219 let mut cur = Cursor::new(bytes);
220 let lead = cur.read_u8()?;
221 let tag = match lead {
222 V2_MAGIC => {
223 let wire_version = cur.read_u8()?;
224 if wire_version != V2_WIRE_VERSION {
225 return Err(HandleDecodeError::BadWireVersion { got: wire_version });
226 }
227 let tag_byte = cur.read_u8()?;
228 BackendTag::from_wire_byte(tag_byte)
229 .ok_or(HandleDecodeError::BadBackendTag { got: tag_byte })?
230 }
231 V1_VERSION_TAG => BackendTag::Valkey,
232 other => return Err(HandleDecodeError::BadV1Version { got: other }),
233 };
234 let payload = read_fields(&mut cur)?;
235 cur.expect_eof()?;
236 Ok(DecodedHandle { tag, payload })
237}
238
239#[cfg(feature = "test-fixtures")]
256pub fn v1_handle_for_tests(payload: &HandlePayload) -> Vec<u8> {
257 let mut buf: Vec<u8> = Vec::with_capacity(256);
258 buf.push(V1_VERSION_TAG);
259 write_fields(&mut buf, payload);
260 buf
261}
262
263fn write_fields(buf: &mut Vec<u8>, f: &HandlePayload) {
266 write_str(buf, &f.execution_id.to_string());
267 buf.extend_from_slice(&f.attempt_index.0.to_le_bytes());
268 write_str(buf, &f.attempt_id.to_string());
269 write_str(buf, &f.lease_id.to_string());
270 buf.extend_from_slice(&f.lease_epoch.0.to_le_bytes());
271 buf.extend_from_slice(&f.lease_ttl_ms.to_le_bytes());
272 write_str(buf, f.lane_id.as_str());
273 write_str(buf, f.worker_instance_id.as_str());
274}
275
276fn read_fields(cur: &mut Cursor<'_>) -> Result<HandlePayload, HandleDecodeError> {
277 let execution_id_str = cur.read_str("execution_id")?;
278 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
279 HandleDecodeError::ParseField {
280 field: "execution_id",
281 detail: e.to_string(),
282 }
283 })?;
284 let attempt_index = AttemptIndex::new(cur.read_u32()?);
285 let attempt_id_str = cur.read_str("attempt_id")?;
286 let attempt_id =
287 AttemptId::parse(&attempt_id_str).map_err(|e| HandleDecodeError::ParseField {
288 field: "attempt_id",
289 detail: e.to_string(),
290 })?;
291 let lease_id_str = cur.read_str("lease_id")?;
292 let lease_id = LeaseId::parse(&lease_id_str).map_err(|e| HandleDecodeError::ParseField {
293 field: "lease_id",
294 detail: e.to_string(),
295 })?;
296 let lease_epoch = LeaseEpoch(cur.read_u64()?);
297 let lease_ttl_ms = cur.read_u64()?;
298 let lane_id_str = cur.read_str("lane_id")?;
299 let lane_id = LaneId::new(lane_id_str);
300 let worker_str = cur.read_str("worker_instance_id")?;
301 let worker_instance_id = WorkerInstanceId::new(worker_str);
302 Ok(HandlePayload {
303 execution_id,
304 attempt_index,
305 attempt_id,
306 lease_id,
307 lease_epoch,
308 lease_ttl_ms,
309 lane_id,
310 worker_instance_id,
311 })
312}
313
314fn write_str(buf: &mut Vec<u8>, s: &str) {
315 let bytes = s.as_bytes();
316 let (len, take) = match u32::try_from(bytes.len()) {
321 Ok(n) => (n, bytes.len()),
322 Err(_) => (u32::MAX, u32::MAX as usize),
323 };
324 buf.extend_from_slice(&len.to_le_bytes());
325 buf.extend_from_slice(&bytes[..take]);
326}
327
328struct Cursor<'a> {
329 bytes: &'a [u8],
330 pos: usize,
331}
332
333impl<'a> Cursor<'a> {
334 fn new(bytes: &'a [u8]) -> Self {
335 Self { bytes, pos: 0 }
336 }
337
338 fn take(&mut self, n: usize) -> Result<&'a [u8], HandleDecodeError> {
339 if self.pos + n > self.bytes.len() {
340 return Err(HandleDecodeError::Truncated {
341 needed: n,
342 at: self.pos,
343 have: self.bytes.len(),
344 });
345 }
346 let slice = &self.bytes[self.pos..self.pos + n];
347 self.pos += n;
348 Ok(slice)
349 }
350
351 fn read_u8(&mut self) -> Result<u8, HandleDecodeError> {
352 Ok(self.take(1)?[0])
353 }
354
355 fn read_u32(&mut self) -> Result<u32, HandleDecodeError> {
356 let mut b = [0u8; 4];
357 b.copy_from_slice(self.take(4)?);
358 Ok(u32::from_le_bytes(b))
359 }
360
361 fn read_u64(&mut self) -> Result<u64, HandleDecodeError> {
362 let mut b = [0u8; 8];
363 b.copy_from_slice(self.take(8)?);
364 Ok(u64::from_le_bytes(b))
365 }
366
367 fn read_str(&mut self, field: &'static str) -> Result<String, HandleDecodeError> {
368 let len = self.read_u32()? as usize;
369 let bytes = self.take(len)?;
370 String::from_utf8(bytes.to_vec())
371 .map_err(|_| HandleDecodeError::InvalidUtf8 { field })
372 }
373
374 fn expect_eof(&self) -> Result<(), HandleDecodeError> {
375 if self.pos != self.bytes.len() {
376 return Err(HandleDecodeError::TrailingBytes {
377 pos: self.pos,
378 len: self.bytes.len(),
379 });
380 }
381 Ok(())
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use crate::partition::PartitionConfig;
389
390 fn sample_payload() -> HandlePayload {
391 HandlePayload {
392 execution_id: ExecutionId::solo(
393 &LaneId::new("default"),
394 &PartitionConfig::default(),
395 ),
396 attempt_index: AttemptIndex::new(3),
397 attempt_id: AttemptId::new(),
398 lease_id: LeaseId::new(),
399 lease_epoch: LeaseEpoch(7),
400 lease_ttl_ms: 30_000,
401 lane_id: LaneId::new("default"),
402 worker_instance_id: WorkerInstanceId::new("worker-1"),
403 }
404 }
405
406 #[test]
407 fn round_trip_valkey_v2() {
408 let p = sample_payload();
409 let opaque = encode(BackendTag::Valkey, &p);
410 assert_eq!(opaque.as_bytes()[0], V2_MAGIC);
412 assert_eq!(opaque.as_bytes()[1], V2_WIRE_VERSION);
413 assert_eq!(opaque.as_bytes()[2], BackendTag::Valkey.wire_byte());
414 let decoded = decode(&opaque).expect("round-trip");
415 assert_eq!(decoded.tag, BackendTag::Valkey);
416 assert_eq!(decoded.payload, p);
417 }
418
419 #[test]
420 fn round_trip_postgres_v2() {
421 let p = sample_payload();
422 let opaque = encode(BackendTag::Postgres, &p);
423 assert_eq!(opaque.as_bytes()[2], BackendTag::Postgres.wire_byte());
424 let decoded = decode(&opaque).expect("round-trip");
425 assert_eq!(decoded.tag, BackendTag::Postgres);
426 assert_eq!(decoded.payload, p);
427 }
428
429 #[test]
434 fn old_v1_format_decodes_as_valkey() {
435 let p = sample_payload();
436 let mut buf: Vec<u8> = Vec::new();
439 buf.push(V1_VERSION_TAG); write_fields(&mut buf, &p);
441 let opaque = HandleOpaque::new(buf.into_boxed_slice());
442 let decoded = decode(&opaque).expect("v1 compat decode");
443 assert_eq!(decoded.tag, BackendTag::Valkey);
444 assert_eq!(decoded.payload, p);
445 }
446
447 #[test]
448 fn truncated_handle_rejected() {
449 let opaque = HandleOpaque::new(Box::new([V2_MAGIC]));
451 let err = decode(&opaque).unwrap_err();
452 assert!(matches!(err, HandleDecodeError::Truncated { .. }));
453 }
454
455 #[test]
456 fn bad_v2_wire_version_rejected() {
457 let opaque = HandleOpaque::new(Box::new([V2_MAGIC, 0xFF, 0x01]));
459 let err = decode(&opaque).unwrap_err();
460 assert!(matches!(err, HandleDecodeError::BadWireVersion { got: 0xFF }));
461 }
462
463 #[test]
464 fn bad_backend_tag_rejected() {
465 let opaque = HandleOpaque::new(Box::new([V2_MAGIC, V2_WIRE_VERSION, 0xFE]));
467 let err = decode(&opaque).unwrap_err();
468 assert!(matches!(err, HandleDecodeError::BadBackendTag { got: 0xFE }));
469 }
470
471 #[test]
472 fn bad_leading_byte_rejected() {
473 let opaque = HandleOpaque::new(Box::new([0xAB]));
475 let err = decode(&opaque).unwrap_err();
476 assert!(matches!(err, HandleDecodeError::BadV1Version { got: 0xAB }));
477 }
478
479 #[cfg(feature = "test-fixtures")]
486 #[test]
487 fn v1_handle_for_tests_round_trip() {
488 let p = sample_payload();
489 let v1_bytes = super::v1_handle_for_tests(&p);
490 assert_eq!(v1_bytes[0], V1_VERSION_TAG);
492 let mut expected: Vec<u8> = Vec::new();
494 expected.push(V1_VERSION_TAG);
495 write_fields(&mut expected, &p);
496 assert_eq!(v1_bytes, expected);
497 let opaque = HandleOpaque::new(v1_bytes.into_boxed_slice());
499 let decoded = decode(&opaque).expect("v1 fixture decodes");
500 assert_eq!(decoded.tag, BackendTag::Valkey);
501 assert_eq!(decoded.payload, p);
502 }
503
504 #[test]
505 fn decode_error_maps_to_validation_corruption() {
506 let err: EngineError = HandleDecodeError::Truncated {
507 needed: 4,
508 at: 1,
509 have: 1,
510 }
511 .into();
512 match err {
513 EngineError::Validation { kind, detail } => {
514 assert_eq!(kind, ValidationKind::Corruption);
515 assert!(detail.contains("handle_codec"));
516 }
517 other => panic!("expected Validation, got {other:?}"),
518 }
519 }
520}