1use crate::backend::{BackendTag, HandleOpaque};
64use crate::engine_error::{EngineError, ValidationKind};
65use crate::types::{
66 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, WorkerInstanceId,
67};
68
69const V2_MAGIC: u8 = 0x02;
72const V2_WIRE_VERSION: u8 = 0x01;
74const V1_VERSION_TAG: u8 = 0x01;
76
77#[derive(Clone, Debug, PartialEq, Eq)]
84#[non_exhaustive]
85pub struct HandlePayload {
86 pub execution_id: ExecutionId,
87 pub attempt_index: AttemptIndex,
88 pub attempt_id: AttemptId,
89 pub lease_id: LeaseId,
90 pub lease_epoch: LeaseEpoch,
91 pub lease_ttl_ms: u64,
92 pub lane_id: LaneId,
93 pub worker_instance_id: WorkerInstanceId,
94}
95
96impl HandlePayload {
97 #[allow(clippy::too_many_arguments)] pub fn new(
101 execution_id: ExecutionId,
102 attempt_index: AttemptIndex,
103 attempt_id: AttemptId,
104 lease_id: LeaseId,
105 lease_epoch: LeaseEpoch,
106 lease_ttl_ms: u64,
107 lane_id: LaneId,
108 worker_instance_id: WorkerInstanceId,
109 ) -> Self {
110 Self {
111 execution_id,
112 attempt_index,
113 attempt_id,
114 lease_id,
115 lease_epoch,
116 lease_ttl_ms,
117 lane_id,
118 worker_instance_id,
119 }
120 }
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
127#[non_exhaustive]
128pub struct DecodedHandle {
129 pub tag: BackendTag,
130 pub payload: HandlePayload,
131}
132
133impl DecodedHandle {
134 pub fn new(tag: BackendTag, payload: HandlePayload) -> Self {
135 Self { tag, payload }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
143#[non_exhaustive]
144pub enum HandleDecodeError {
145 Truncated { needed: usize, at: usize, have: usize },
147 TrailingBytes { pos: usize, len: usize },
149 BadWireVersion { got: u8 },
151 BadV1Version { got: u8 },
153 BadBackendTag { got: u8 },
155 InvalidUtf8 { field: &'static str },
157 ParseField { field: &'static str, detail: String },
159}
160
161impl std::fmt::Display for HandleDecodeError {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 match self {
164 HandleDecodeError::Truncated { needed, at, have } => write!(
165 f,
166 "truncated handle: needed {needed} bytes at offset {at}, have {have}"
167 ),
168 HandleDecodeError::TrailingBytes { pos, len } => {
169 write!(f, "trailing bytes in handle: pos={pos}, len={len}")
170 }
171 HandleDecodeError::BadWireVersion { got } => {
172 write!(f, "handle v2 wire-version byte {got:#x} not recognised")
173 }
174 HandleDecodeError::BadV1Version { got } => write!(
175 f,
176 "handle v1 version byte {got:#x} not recognised (expected {V1_VERSION_TAG:#x})"
177 ),
178 HandleDecodeError::BadBackendTag { got } => {
179 write!(f, "handle backend-tag byte {got:#x} not recognised")
180 }
181 HandleDecodeError::InvalidUtf8 { field } => {
182 write!(f, "handle field `{field}` is not valid UTF-8")
183 }
184 HandleDecodeError::ParseField { field, detail } => {
185 write!(f, "handle field `{field}` parse failed: {detail}")
186 }
187 }
188 }
189}
190
191impl std::error::Error for HandleDecodeError {}
192
193impl From<HandleDecodeError> for EngineError {
194 fn from(err: HandleDecodeError) -> Self {
195 EngineError::Validation {
196 kind: ValidationKind::Corruption,
197 detail: format!("handle_codec: {err}"),
198 }
199 }
200}
201
202pub fn encode(tag: BackendTag, payload: &HandlePayload) -> HandleOpaque {
206 let mut buf: Vec<u8> = Vec::with_capacity(256);
207 buf.push(V2_MAGIC);
208 buf.push(V2_WIRE_VERSION);
209 buf.push(tag.wire_byte());
210 write_fields(&mut buf, payload);
211 HandleOpaque::new(buf.into_boxed_slice())
212}
213
214pub fn decode(opaque: &HandleOpaque) -> Result<DecodedHandle, HandleDecodeError> {
219 let bytes = opaque.as_bytes();
220 let mut cur = Cursor::new(bytes);
221 let lead = cur.read_u8()?;
222 let tag = match lead {
223 V2_MAGIC => {
224 let wire_version = cur.read_u8()?;
225 if wire_version != V2_WIRE_VERSION {
226 return Err(HandleDecodeError::BadWireVersion { got: wire_version });
227 }
228 let tag_byte = cur.read_u8()?;
229 BackendTag::from_wire_byte(tag_byte)
230 .ok_or(HandleDecodeError::BadBackendTag { got: tag_byte })?
231 }
232 V1_VERSION_TAG => BackendTag::Valkey,
233 other => return Err(HandleDecodeError::BadV1Version { got: other }),
234 };
235 let payload = read_fields(&mut cur)?;
236 cur.expect_eof()?;
237 Ok(DecodedHandle { tag, payload })
238}
239
240#[cfg(feature = "test-fixtures")]
257pub fn v1_handle_for_tests(payload: &HandlePayload) -> Vec<u8> {
258 let mut buf: Vec<u8> = Vec::with_capacity(256);
259 buf.push(V1_VERSION_TAG);
260 write_fields(&mut buf, payload);
261 buf
262}
263
264fn write_fields(buf: &mut Vec<u8>, f: &HandlePayload) {
267 write_str(buf, &f.execution_id.to_string());
268 buf.extend_from_slice(&f.attempt_index.0.to_le_bytes());
269 write_str(buf, &f.attempt_id.to_string());
270 write_str(buf, &f.lease_id.to_string());
271 buf.extend_from_slice(&f.lease_epoch.0.to_le_bytes());
272 buf.extend_from_slice(&f.lease_ttl_ms.to_le_bytes());
273 write_str(buf, f.lane_id.as_str());
274 write_str(buf, f.worker_instance_id.as_str());
275}
276
277fn read_fields(cur: &mut Cursor<'_>) -> Result<HandlePayload, HandleDecodeError> {
278 let execution_id_str = cur.read_str("execution_id")?;
279 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
280 HandleDecodeError::ParseField {
281 field: "execution_id",
282 detail: e.to_string(),
283 }
284 })?;
285 let attempt_index = AttemptIndex::new(cur.read_u32()?);
286 let attempt_id_str = cur.read_str("attempt_id")?;
287 let attempt_id =
288 AttemptId::parse(&attempt_id_str).map_err(|e| HandleDecodeError::ParseField {
289 field: "attempt_id",
290 detail: e.to_string(),
291 })?;
292 let lease_id_str = cur.read_str("lease_id")?;
293 let lease_id = LeaseId::parse(&lease_id_str).map_err(|e| HandleDecodeError::ParseField {
294 field: "lease_id",
295 detail: e.to_string(),
296 })?;
297 let lease_epoch = LeaseEpoch(cur.read_u64()?);
298 let lease_ttl_ms = cur.read_u64()?;
299 let lane_id_str = cur.read_str("lane_id")?;
300 let lane_id = LaneId::new(lane_id_str);
301 let worker_str = cur.read_str("worker_instance_id")?;
302 let worker_instance_id = WorkerInstanceId::new(worker_str);
303 Ok(HandlePayload {
304 execution_id,
305 attempt_index,
306 attempt_id,
307 lease_id,
308 lease_epoch,
309 lease_ttl_ms,
310 lane_id,
311 worker_instance_id,
312 })
313}
314
315fn write_str(buf: &mut Vec<u8>, s: &str) {
316 let bytes = s.as_bytes();
317 let (len, take) = match u32::try_from(bytes.len()) {
322 Ok(n) => (n, bytes.len()),
323 Err(_) => (u32::MAX, u32::MAX as usize),
324 };
325 buf.extend_from_slice(&len.to_le_bytes());
326 buf.extend_from_slice(&bytes[..take]);
327}
328
329struct Cursor<'a> {
330 bytes: &'a [u8],
331 pos: usize,
332}
333
334impl<'a> Cursor<'a> {
335 fn new(bytes: &'a [u8]) -> Self {
336 Self { bytes, pos: 0 }
337 }
338
339 fn take(&mut self, n: usize) -> Result<&'a [u8], HandleDecodeError> {
340 if self.pos + n > self.bytes.len() {
341 return Err(HandleDecodeError::Truncated {
342 needed: n,
343 at: self.pos,
344 have: self.bytes.len(),
345 });
346 }
347 let slice = &self.bytes[self.pos..self.pos + n];
348 self.pos += n;
349 Ok(slice)
350 }
351
352 fn read_u8(&mut self) -> Result<u8, HandleDecodeError> {
353 Ok(self.take(1)?[0])
354 }
355
356 fn read_u32(&mut self) -> Result<u32, HandleDecodeError> {
357 let mut b = [0u8; 4];
358 b.copy_from_slice(self.take(4)?);
359 Ok(u32::from_le_bytes(b))
360 }
361
362 fn read_u64(&mut self) -> Result<u64, HandleDecodeError> {
363 let mut b = [0u8; 8];
364 b.copy_from_slice(self.take(8)?);
365 Ok(u64::from_le_bytes(b))
366 }
367
368 fn read_str(&mut self, field: &'static str) -> Result<String, HandleDecodeError> {
369 let len = self.read_u32()? as usize;
370 let bytes = self.take(len)?;
371 String::from_utf8(bytes.to_vec())
372 .map_err(|_| HandleDecodeError::InvalidUtf8 { field })
373 }
374
375 fn expect_eof(&self) -> Result<(), HandleDecodeError> {
376 if self.pos != self.bytes.len() {
377 return Err(HandleDecodeError::TrailingBytes {
378 pos: self.pos,
379 len: self.bytes.len(),
380 });
381 }
382 Ok(())
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use crate::partition::PartitionConfig;
390
391 fn sample_payload() -> HandlePayload {
392 HandlePayload {
393 execution_id: ExecutionId::solo(
394 &LaneId::new("default"),
395 &PartitionConfig::default(),
396 ),
397 attempt_index: AttemptIndex::new(3),
398 attempt_id: AttemptId::new(),
399 lease_id: LeaseId::new(),
400 lease_epoch: LeaseEpoch(7),
401 lease_ttl_ms: 30_000,
402 lane_id: LaneId::new("default"),
403 worker_instance_id: WorkerInstanceId::new("worker-1"),
404 }
405 }
406
407 #[test]
408 fn round_trip_valkey_v2() {
409 let p = sample_payload();
410 let opaque = encode(BackendTag::Valkey, &p);
411 assert_eq!(opaque.as_bytes()[0], V2_MAGIC);
413 assert_eq!(opaque.as_bytes()[1], V2_WIRE_VERSION);
414 assert_eq!(opaque.as_bytes()[2], BackendTag::Valkey.wire_byte());
415 let decoded = decode(&opaque).expect("round-trip");
416 assert_eq!(decoded.tag, BackendTag::Valkey);
417 assert_eq!(decoded.payload, p);
418 }
419
420 #[test]
421 fn round_trip_postgres_v2() {
422 let p = sample_payload();
423 let opaque = encode(BackendTag::Postgres, &p);
424 assert_eq!(opaque.as_bytes()[2], BackendTag::Postgres.wire_byte());
425 let decoded = decode(&opaque).expect("round-trip");
426 assert_eq!(decoded.tag, BackendTag::Postgres);
427 assert_eq!(decoded.payload, p);
428 }
429
430 #[test]
433 fn round_trip_sqlite_v2() {
434 let p = sample_payload();
435 let opaque = encode(BackendTag::Sqlite, &p);
436 assert_eq!(opaque.as_bytes()[2], BackendTag::Sqlite.wire_byte());
437 let decoded = decode(&opaque).expect("round-trip");
438 assert_eq!(decoded.tag, BackendTag::Sqlite);
439 assert_eq!(decoded.payload, p);
440 }
441
442 #[test]
443 fn wire_byte_sqlite() {
444 assert_eq!(BackendTag::Sqlite.wire_byte(), 0x03);
445 }
446
447 #[test]
448 fn from_wire_byte_sqlite() {
449 assert_eq!(BackendTag::from_wire_byte(0x03), Some(BackendTag::Sqlite));
450 }
451
452 #[test]
457 fn old_v1_format_decodes_as_valkey() {
458 let p = sample_payload();
459 let mut buf: Vec<u8> = Vec::new();
462 buf.push(V1_VERSION_TAG); write_fields(&mut buf, &p);
464 let opaque = HandleOpaque::new(buf.into_boxed_slice());
465 let decoded = decode(&opaque).expect("v1 compat decode");
466 assert_eq!(decoded.tag, BackendTag::Valkey);
467 assert_eq!(decoded.payload, p);
468 }
469
470 #[test]
471 fn truncated_handle_rejected() {
472 let opaque = HandleOpaque::new(Box::new([V2_MAGIC]));
474 let err = decode(&opaque).unwrap_err();
475 assert!(matches!(err, HandleDecodeError::Truncated { .. }));
476 }
477
478 #[test]
479 fn bad_v2_wire_version_rejected() {
480 let opaque = HandleOpaque::new(Box::new([V2_MAGIC, 0xFF, 0x01]));
482 let err = decode(&opaque).unwrap_err();
483 assert!(matches!(err, HandleDecodeError::BadWireVersion { got: 0xFF }));
484 }
485
486 #[test]
487 fn bad_backend_tag_rejected() {
488 let opaque = HandleOpaque::new(Box::new([V2_MAGIC, V2_WIRE_VERSION, 0xFE]));
490 let err = decode(&opaque).unwrap_err();
491 assert!(matches!(err, HandleDecodeError::BadBackendTag { got: 0xFE }));
492 }
493
494 #[test]
495 fn bad_leading_byte_rejected() {
496 let opaque = HandleOpaque::new(Box::new([0xAB]));
498 let err = decode(&opaque).unwrap_err();
499 assert!(matches!(err, HandleDecodeError::BadV1Version { got: 0xAB }));
500 }
501
502 #[cfg(feature = "test-fixtures")]
509 #[test]
510 fn v1_handle_for_tests_round_trip() {
511 let p = sample_payload();
512 let v1_bytes = super::v1_handle_for_tests(&p);
513 assert_eq!(v1_bytes[0], V1_VERSION_TAG);
515 let mut expected: Vec<u8> = Vec::new();
517 expected.push(V1_VERSION_TAG);
518 write_fields(&mut expected, &p);
519 assert_eq!(v1_bytes, expected);
520 let opaque = HandleOpaque::new(v1_bytes.into_boxed_slice());
522 let decoded = decode(&opaque).expect("v1 fixture decodes");
523 assert_eq!(decoded.tag, BackendTag::Valkey);
524 assert_eq!(decoded.payload, p);
525 }
526
527 #[test]
528 fn decode_error_maps_to_validation_corruption() {
529 let err: EngineError = HandleDecodeError::Truncated {
530 needed: 4,
531 at: 1,
532 have: 1,
533 }
534 .into();
535 match err {
536 EngineError::Validation { kind, detail } => {
537 assert_eq!(kind, ValidationKind::Corruption);
538 assert!(detail.contains("handle_codec"));
539 }
540 other => panic!("expected Validation, got {other:?}"),
541 }
542 }
543}