Skip to main content

ff_core/
handle_codec.rs

1//! Backend-agnostic codec for [`HandleOpaque`] payloads.
2//!
3//! **RFC-v0.7 Wave 1c.** Extracted from `ff-backend-valkey` so the
4//! Postgres backend (Wave 4+) decodes the same wire shape. Worker-B
5//! §4.1 flagged `handle_codec` as one of the top Valkey-shape-leaking
6//! subsystems in the v0.7 migration master spec; this module is the
7//! relocation target.
8//!
9//! # Responsibilities
10//!
11//! * Serialize a [`HandlePayload`] (attempt-cookie fields every
12//!   backend needs to route an op) into the opaque byte buffer carried
13//!   inside a [`Handle`]. See [`encode`].
14//! * Parse that buffer back into a [`HandlePayload`] plus the
15//!   [`BackendTag`] that minted it. See [`decode`].
16//! * Expose a typed [`HandleDecodeError`] mapping cleanly to
17//!   `EngineError::Validation { kind: Corruption, .. }` at the backend
18//!   boundary. See [`From<HandleDecodeError> for EngineError`].
19//!
20//! # Wire format v2
21//!
22//! New-format opaque buffers are self-identifying — the leading byte
23//! distinguishes pre-Wave-1c (Valkey-only) buffers from v2 buffers:
24//!
25//! ```text
26//! v2 layout (Wave 1c+):
27//!   [0]       u8    magic (0x02) — new-format marker
28//!   [1]       u8    wire version (today: 0x01)
29//!   [2]       u8    BackendTag wire byte (0x01 = Valkey, 0x02 = Postgres)
30//!   [3..]             fields (see §Fields)
31//!
32//! v1 layout (pre-Wave-1c, Valkey-only — read-only compat):
33//!   [0]       u8    wire version (0x01)
34//!   [1..]             fields (§Fields)
35//! ```
36//!
37//! ## §Fields
38//!
39//! ```text
40//! u32(LE)  execution_id length
41//! utf8     execution_id bytes
42//! u32(LE)  attempt_index
43//! u32(LE)  attempt_id length
44//! utf8     attempt_id bytes
45//! u32(LE)  lease_id length
46//! utf8     lease_id bytes
47//! u64(LE)  lease_epoch
48//! u64(LE)  lease_ttl_ms
49//! u32(LE)  lane_id length
50//! utf8     lane_id bytes
51//! u32(LE)  worker_instance_id length
52//! utf8     worker_instance_id bytes
53//! ```
54//!
55//! # Backward-compat
56//!
57//! v1 is Valkey-only: any pre-Wave-1c handle in-flight at upgrade time
58//! decodes under `BackendTag::Valkey`. The magic byte `0x02` was
59//! chosen to be disjoint from the v1 leading byte (`0x01` = version
60//! tag) — the decoder switches on the first byte.
61
62use crate::backend::{BackendTag, HandleOpaque};
63use crate::engine_error::{EngineError, ValidationKind};
64use crate::types::{
65    AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, WorkerInstanceId,
66};
67
68/// v2 "new-format" marker. Disjoint from the v1 leading byte
69/// (`V1_VERSION_TAG = 0x01`) so the decoder can switch cleanly.
70const V2_MAGIC: u8 = 0x02;
71/// v2 wire version. Bumped when field layout changes.
72const V2_WIRE_VERSION: u8 = 0x01;
73/// v1 (pre-Wave-1c) leading byte. Present only for the compat decode path.
74const V1_VERSION_TAG: u8 = 0x01;
75
76/// Decoded view of an encoded [`HandleOpaque`] — the minimum set of
77/// fields every backend needs to construct its per-op KEYS + ARGV.
78///
79/// `#[non_exhaustive]`: fields grow additively when new attempt-cookie
80/// state lands (e.g. a Wave-5 partition routing hint). Construct via
81/// [`HandlePayload::new`]; field-access is via the pub fields.
82#[derive(Clone, Debug, PartialEq, Eq)]
83#[non_exhaustive]
84pub struct HandlePayload {
85    pub execution_id: ExecutionId,
86    pub attempt_index: AttemptIndex,
87    pub attempt_id: AttemptId,
88    pub lease_id: LeaseId,
89    pub lease_epoch: LeaseEpoch,
90    pub lease_ttl_ms: u64,
91    pub lane_id: LaneId,
92    pub worker_instance_id: WorkerInstanceId,
93}
94
95impl HandlePayload {
96    /// Construct a payload. All fields are mandatory at Wave 1c — the
97    /// `#[non_exhaustive]` guard lets future fields land additively.
98    #[allow(clippy::too_many_arguments)] // all 8 fields are required attempt-cookie state
99    pub fn new(
100        execution_id: ExecutionId,
101        attempt_index: AttemptIndex,
102        attempt_id: AttemptId,
103        lease_id: LeaseId,
104        lease_epoch: LeaseEpoch,
105        lease_ttl_ms: u64,
106        lane_id: LaneId,
107        worker_instance_id: WorkerInstanceId,
108    ) -> Self {
109        Self {
110            execution_id,
111            attempt_index,
112            attempt_id,
113            lease_id,
114            lease_epoch,
115            lease_ttl_ms,
116            lane_id,
117            worker_instance_id,
118        }
119    }
120}
121
122/// Output of [`decode`] — the decoded [`HandlePayload`] plus the
123/// [`BackendTag`] embedded in the opaque buffer (or inferred from the
124/// v1 compat path).
125#[derive(Clone, Debug, PartialEq, Eq)]
126#[non_exhaustive]
127pub struct DecodedHandle {
128    pub tag: BackendTag,
129    pub payload: HandlePayload,
130}
131
132impl DecodedHandle {
133    pub fn new(tag: BackendTag, payload: HandlePayload) -> Self {
134        Self { tag, payload }
135    }
136}
137
138/// Typed decode-failure classification. Mapped to
139/// `EngineError::Validation { kind: Corruption, .. }` at the backend
140/// boundary via the [`From`] impl.
141#[derive(Debug, Clone, PartialEq, Eq)]
142#[non_exhaustive]
143pub enum HandleDecodeError {
144    /// Buffer shorter than required by the current read position.
145    Truncated { needed: usize, at: usize, have: usize },
146    /// Trailing bytes after the last expected field.
147    TrailingBytes { pos: usize, len: usize },
148    /// v2 wire-version byte did not match a supported version.
149    BadWireVersion { got: u8 },
150    /// v1-path version byte did not match the expected tag.
151    BadV1Version { got: u8 },
152    /// v2 backend-tag byte did not map to a known [`BackendTag`].
153    BadBackendTag { got: u8 },
154    /// A length-prefixed string had invalid UTF-8.
155    InvalidUtf8 { field: &'static str },
156    /// A parsed id/field failed its own validation (e.g. `ExecutionId::parse`).
157    ParseField { field: &'static str, detail: String },
158}
159
160impl std::fmt::Display for HandleDecodeError {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            HandleDecodeError::Truncated { needed, at, have } => write!(
164                f,
165                "truncated handle: needed {needed} bytes at offset {at}, have {have}"
166            ),
167            HandleDecodeError::TrailingBytes { pos, len } => {
168                write!(f, "trailing bytes in handle: pos={pos}, len={len}")
169            }
170            HandleDecodeError::BadWireVersion { got } => {
171                write!(f, "handle v2 wire-version byte {got:#x} not recognised")
172            }
173            HandleDecodeError::BadV1Version { got } => write!(
174                f,
175                "handle v1 version byte {got:#x} not recognised (expected {V1_VERSION_TAG:#x})"
176            ),
177            HandleDecodeError::BadBackendTag { got } => {
178                write!(f, "handle backend-tag byte {got:#x} not recognised")
179            }
180            HandleDecodeError::InvalidUtf8 { field } => {
181                write!(f, "handle field `{field}` is not valid UTF-8")
182            }
183            HandleDecodeError::ParseField { field, detail } => {
184                write!(f, "handle field `{field}` parse failed: {detail}")
185            }
186        }
187    }
188}
189
190impl std::error::Error for HandleDecodeError {}
191
192impl From<HandleDecodeError> for EngineError {
193    fn from(err: HandleDecodeError) -> Self {
194        EngineError::Validation {
195            kind: ValidationKind::Corruption,
196            detail: format!("handle_codec: {err}"),
197        }
198    }
199}
200
201/// Encode a [`HandlePayload`] into a [`HandleOpaque`] tagged with
202/// `tag`. Produces a v2 buffer — callers decoding via [`decode`] always
203/// get the tag they encoded.
204pub fn encode(tag: BackendTag, payload: &HandlePayload) -> HandleOpaque {
205    let mut buf: Vec<u8> = Vec::with_capacity(256);
206    buf.push(V2_MAGIC);
207    buf.push(V2_WIRE_VERSION);
208    buf.push(tag.wire_byte());
209    write_fields(&mut buf, payload);
210    HandleOpaque::new(buf.into_boxed_slice())
211}
212
213/// Decode a [`HandleOpaque`] into a [`DecodedHandle`]. Accepts both
214/// v2 buffers (leading `V2_MAGIC`) and legacy v1 Valkey buffers
215/// (leading `V1_VERSION_TAG`); the v1 path returns
216/// `tag = BackendTag::Valkey`.
217pub fn decode(opaque: &HandleOpaque) -> Result<DecodedHandle, HandleDecodeError> {
218    let bytes = opaque.as_bytes();
219    let mut cur = Cursor::new(bytes);
220    let lead = cur.read_u8()?;
221    let tag = match lead {
222        V2_MAGIC => {
223            let wire_version = cur.read_u8()?;
224            if wire_version != V2_WIRE_VERSION {
225                return Err(HandleDecodeError::BadWireVersion { got: wire_version });
226            }
227            let tag_byte = cur.read_u8()?;
228            BackendTag::from_wire_byte(tag_byte)
229                .ok_or(HandleDecodeError::BadBackendTag { got: tag_byte })?
230        }
231        V1_VERSION_TAG => BackendTag::Valkey,
232        other => return Err(HandleDecodeError::BadV1Version { got: other }),
233    };
234    let payload = read_fields(&mut cur)?;
235    cur.expect_eof()?;
236    Ok(DecodedHandle { tag, payload })
237}
238
239// ── Field serialization ─────────────────────────────────────────────────
240
241fn write_fields(buf: &mut Vec<u8>, f: &HandlePayload) {
242    write_str(buf, &f.execution_id.to_string());
243    buf.extend_from_slice(&f.attempt_index.0.to_le_bytes());
244    write_str(buf, &f.attempt_id.to_string());
245    write_str(buf, &f.lease_id.to_string());
246    buf.extend_from_slice(&f.lease_epoch.0.to_le_bytes());
247    buf.extend_from_slice(&f.lease_ttl_ms.to_le_bytes());
248    write_str(buf, f.lane_id.as_str());
249    write_str(buf, f.worker_instance_id.as_str());
250}
251
252fn read_fields(cur: &mut Cursor<'_>) -> Result<HandlePayload, HandleDecodeError> {
253    let execution_id_str = cur.read_str("execution_id")?;
254    let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
255        HandleDecodeError::ParseField {
256            field: "execution_id",
257            detail: e.to_string(),
258        }
259    })?;
260    let attempt_index = AttemptIndex::new(cur.read_u32()?);
261    let attempt_id_str = cur.read_str("attempt_id")?;
262    let attempt_id =
263        AttemptId::parse(&attempt_id_str).map_err(|e| HandleDecodeError::ParseField {
264            field: "attempt_id",
265            detail: e.to_string(),
266        })?;
267    let lease_id_str = cur.read_str("lease_id")?;
268    let lease_id = LeaseId::parse(&lease_id_str).map_err(|e| HandleDecodeError::ParseField {
269        field: "lease_id",
270        detail: e.to_string(),
271    })?;
272    let lease_epoch = LeaseEpoch(cur.read_u64()?);
273    let lease_ttl_ms = cur.read_u64()?;
274    let lane_id_str = cur.read_str("lane_id")?;
275    let lane_id = LaneId::new(lane_id_str);
276    let worker_str = cur.read_str("worker_instance_id")?;
277    let worker_instance_id = WorkerInstanceId::new(worker_str);
278    Ok(HandlePayload {
279        execution_id,
280        attempt_index,
281        attempt_id,
282        lease_id,
283        lease_epoch,
284        lease_ttl_ms,
285        lane_id,
286        worker_instance_id,
287    })
288}
289
290fn write_str(buf: &mut Vec<u8>, s: &str) {
291    let bytes = s.as_bytes();
292    // Clamp rather than panic: if a caller somehow hands in a
293    // >4 GiB string (impossible at the SDK's attempt-cookie scale),
294    // we write a u32::MAX length prefix + the first 4 GiB of bytes.
295    // decode() will detect the trailing truncation.
296    let (len, take) = match u32::try_from(bytes.len()) {
297        Ok(n) => (n, bytes.len()),
298        Err(_) => (u32::MAX, u32::MAX as usize),
299    };
300    buf.extend_from_slice(&len.to_le_bytes());
301    buf.extend_from_slice(&bytes[..take]);
302}
303
304struct Cursor<'a> {
305    bytes: &'a [u8],
306    pos: usize,
307}
308
309impl<'a> Cursor<'a> {
310    fn new(bytes: &'a [u8]) -> Self {
311        Self { bytes, pos: 0 }
312    }
313
314    fn take(&mut self, n: usize) -> Result<&'a [u8], HandleDecodeError> {
315        if self.pos + n > self.bytes.len() {
316            return Err(HandleDecodeError::Truncated {
317                needed: n,
318                at: self.pos,
319                have: self.bytes.len(),
320            });
321        }
322        let slice = &self.bytes[self.pos..self.pos + n];
323        self.pos += n;
324        Ok(slice)
325    }
326
327    fn read_u8(&mut self) -> Result<u8, HandleDecodeError> {
328        Ok(self.take(1)?[0])
329    }
330
331    fn read_u32(&mut self) -> Result<u32, HandleDecodeError> {
332        let mut b = [0u8; 4];
333        b.copy_from_slice(self.take(4)?);
334        Ok(u32::from_le_bytes(b))
335    }
336
337    fn read_u64(&mut self) -> Result<u64, HandleDecodeError> {
338        let mut b = [0u8; 8];
339        b.copy_from_slice(self.take(8)?);
340        Ok(u64::from_le_bytes(b))
341    }
342
343    fn read_str(&mut self, field: &'static str) -> Result<String, HandleDecodeError> {
344        let len = self.read_u32()? as usize;
345        let bytes = self.take(len)?;
346        String::from_utf8(bytes.to_vec())
347            .map_err(|_| HandleDecodeError::InvalidUtf8 { field })
348    }
349
350    fn expect_eof(&self) -> Result<(), HandleDecodeError> {
351        if self.pos != self.bytes.len() {
352            return Err(HandleDecodeError::TrailingBytes {
353                pos: self.pos,
354                len: self.bytes.len(),
355            });
356        }
357        Ok(())
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364    use crate::partition::PartitionConfig;
365
366    fn sample_payload() -> HandlePayload {
367        HandlePayload {
368            execution_id: ExecutionId::solo(
369                &LaneId::new("default"),
370                &PartitionConfig::default(),
371            ),
372            attempt_index: AttemptIndex::new(3),
373            attempt_id: AttemptId::new(),
374            lease_id: LeaseId::new(),
375            lease_epoch: LeaseEpoch(7),
376            lease_ttl_ms: 30_000,
377            lane_id: LaneId::new("default"),
378            worker_instance_id: WorkerInstanceId::new("worker-1"),
379        }
380    }
381
382    #[test]
383    fn round_trip_valkey_v2() {
384        let p = sample_payload();
385        let opaque = encode(BackendTag::Valkey, &p);
386        // v2 leading byte.
387        assert_eq!(opaque.as_bytes()[0], V2_MAGIC);
388        assert_eq!(opaque.as_bytes()[1], V2_WIRE_VERSION);
389        assert_eq!(opaque.as_bytes()[2], BackendTag::Valkey.wire_byte());
390        let decoded = decode(&opaque).expect("round-trip");
391        assert_eq!(decoded.tag, BackendTag::Valkey);
392        assert_eq!(decoded.payload, p);
393    }
394
395    #[test]
396    fn round_trip_postgres_v2() {
397        let p = sample_payload();
398        let opaque = encode(BackendTag::Postgres, &p);
399        assert_eq!(opaque.as_bytes()[2], BackendTag::Postgres.wire_byte());
400        let decoded = decode(&opaque).expect("round-trip");
401        assert_eq!(decoded.tag, BackendTag::Postgres);
402        assert_eq!(decoded.payload, p);
403    }
404
405    /// Regression guard: an opaque buffer produced by the pre-Wave-1c
406    /// Valkey-only codec (leading byte = 0x01, no magic, no embedded
407    /// backend tag) must still decode cleanly as `BackendTag::Valkey`.
408    /// Non-negotiable per Wave 1c spec.
409    #[test]
410    fn old_v1_format_decodes_as_valkey() {
411        let p = sample_payload();
412        // Synthesise a v1 buffer byte-for-byte identical to what the
413        // pre-Wave-1c `encode_handle` in ff-backend-valkey produced.
414        let mut buf: Vec<u8> = Vec::new();
415        buf.push(V1_VERSION_TAG); // single version byte, no magic, no tag
416        write_fields(&mut buf, &p);
417        let opaque = HandleOpaque::new(buf.into_boxed_slice());
418        let decoded = decode(&opaque).expect("v1 compat decode");
419        assert_eq!(decoded.tag, BackendTag::Valkey);
420        assert_eq!(decoded.payload, p);
421    }
422
423    #[test]
424    fn truncated_handle_rejected() {
425        // Only the magic byte — read_u8 for wire_version will fail.
426        let opaque = HandleOpaque::new(Box::new([V2_MAGIC]));
427        let err = decode(&opaque).unwrap_err();
428        assert!(matches!(err, HandleDecodeError::Truncated { .. }));
429    }
430
431    #[test]
432    fn bad_v2_wire_version_rejected() {
433        // v2 magic, unknown wire version.
434        let opaque = HandleOpaque::new(Box::new([V2_MAGIC, 0xFF, 0x01]));
435        let err = decode(&opaque).unwrap_err();
436        assert!(matches!(err, HandleDecodeError::BadWireVersion { got: 0xFF }));
437    }
438
439    #[test]
440    fn bad_backend_tag_rejected() {
441        // v2 magic + valid wire version + bogus tag byte.
442        let opaque = HandleOpaque::new(Box::new([V2_MAGIC, V2_WIRE_VERSION, 0xFE]));
443        let err = decode(&opaque).unwrap_err();
444        assert!(matches!(err, HandleDecodeError::BadBackendTag { got: 0xFE }));
445    }
446
447    #[test]
448    fn bad_leading_byte_rejected() {
449        // Neither v2 magic nor v1 version tag.
450        let opaque = HandleOpaque::new(Box::new([0xAB]));
451        let err = decode(&opaque).unwrap_err();
452        assert!(matches!(err, HandleDecodeError::BadV1Version { got: 0xAB }));
453    }
454
455    #[test]
456    fn decode_error_maps_to_validation_corruption() {
457        let err: EngineError = HandleDecodeError::Truncated {
458            needed: 4,
459            at: 1,
460            have: 1,
461        }
462        .into();
463        match err {
464            EngineError::Validation { kind, detail } => {
465                assert_eq!(kind, ValidationKind::Corruption);
466                assert!(detail.contains("handle_codec"));
467            }
468            other => panic!("expected Validation, got {other:?}"),
469        }
470    }
471}