Skip to main content

ff_core/
handle_codec.rs

1//! Backend-agnostic codec for [`HandleOpaque`] payloads.
2//!
3//! **RFC-v0.7 Wave 1c.** Extracted from `ff-backend-valkey` so the
4//! Postgres backend (Wave 4+) decodes the same wire shape. Worker-B
5//! §4.1 flagged `handle_codec` as one of the top Valkey-shape-leaking
6//! subsystems in the v0.7 migration master spec; this module is the
7//! relocation target.
8//!
9//! # Responsibilities
10//!
11//! * Serialize a [`HandlePayload`] (attempt-cookie fields every
12//!   backend needs to route an op) into the opaque byte buffer carried
13//!   inside a [`Handle`]. See [`encode`].
14//! * Parse that buffer back into a [`HandlePayload`] plus the
15//!   [`BackendTag`] that minted it. See [`decode`].
16//! * Expose a typed [`HandleDecodeError`] mapping cleanly to
17//!   `EngineError::Validation { kind: Corruption, .. }` at the backend
18//!   boundary. See [`From<HandleDecodeError> for EngineError`].
19//!
20//! # Wire format v2
21//!
22//! New-format opaque buffers are self-identifying — the leading byte
23//! distinguishes pre-Wave-1c (Valkey-only) buffers from v2 buffers:
24//!
25//! ```text
26//! v2 layout (Wave 1c+):
27//!   [0]       u8    magic (0x02) — new-format marker
28//!   [1]       u8    wire version (today: 0x01)
29//!   [2]       u8    BackendTag wire byte (0x01 = Valkey, 0x02 = Postgres,
30//!                                         0x03 = Sqlite — RFC-023)
31//!   [3..]             fields (see §Fields)
32//!
33//! v1 layout (pre-Wave-1c, Valkey-only — read-only compat):
34//!   [0]       u8    wire version (0x01)
35//!   [1..]             fields (§Fields)
36//! ```
37//!
38//! ## §Fields
39//!
40//! ```text
41//! u32(LE)  execution_id length
42//! utf8     execution_id bytes
43//! u32(LE)  attempt_index
44//! u32(LE)  attempt_id length
45//! utf8     attempt_id bytes
46//! u32(LE)  lease_id length
47//! utf8     lease_id bytes
48//! u64(LE)  lease_epoch
49//! u64(LE)  lease_ttl_ms
50//! u32(LE)  lane_id length
51//! utf8     lane_id bytes
52//! u32(LE)  worker_instance_id length
53//! utf8     worker_instance_id bytes
54//! ```
55//!
56//! # Backward-compat
57//!
58//! v1 is Valkey-only: any pre-Wave-1c handle in-flight at upgrade time
59//! decodes under `BackendTag::Valkey`. The magic byte `0x02` was
60//! chosen to be disjoint from the v1 leading byte (`0x01` = version
61//! tag) — the decoder switches on the first byte.
62
63use crate::backend::{BackendTag, HandleOpaque};
64use crate::engine_error::{EngineError, ValidationKind};
65use crate::types::{
66    AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, WorkerInstanceId,
67};
68
69/// v2 "new-format" marker. Disjoint from the v1 leading byte
70/// (`V1_VERSION_TAG = 0x01`) so the decoder can switch cleanly.
71const V2_MAGIC: u8 = 0x02;
72/// v2 wire version. Bumped when field layout changes.
73const V2_WIRE_VERSION: u8 = 0x01;
74/// v1 (pre-Wave-1c) leading byte. Present only for the compat decode path.
75const V1_VERSION_TAG: u8 = 0x01;
76
77/// Decoded view of an encoded [`HandleOpaque`] — the minimum set of
78/// fields every backend needs to construct its per-op KEYS + ARGV.
79///
80/// `#[non_exhaustive]`: fields grow additively when new attempt-cookie
81/// state lands (e.g. a Wave-5 partition routing hint). Construct via
82/// [`HandlePayload::new`]; field-access is via the pub fields.
83#[derive(Clone, Debug, PartialEq, Eq)]
84#[non_exhaustive]
85pub struct HandlePayload {
86    pub execution_id: ExecutionId,
87    pub attempt_index: AttemptIndex,
88    pub attempt_id: AttemptId,
89    pub lease_id: LeaseId,
90    pub lease_epoch: LeaseEpoch,
91    pub lease_ttl_ms: u64,
92    pub lane_id: LaneId,
93    pub worker_instance_id: WorkerInstanceId,
94}
95
96impl HandlePayload {
97    /// Construct a payload. All fields are mandatory at Wave 1c — the
98    /// `#[non_exhaustive]` guard lets future fields land additively.
99    #[allow(clippy::too_many_arguments)] // all 8 fields are required attempt-cookie state
100    pub fn new(
101        execution_id: ExecutionId,
102        attempt_index: AttemptIndex,
103        attempt_id: AttemptId,
104        lease_id: LeaseId,
105        lease_epoch: LeaseEpoch,
106        lease_ttl_ms: u64,
107        lane_id: LaneId,
108        worker_instance_id: WorkerInstanceId,
109    ) -> Self {
110        Self {
111            execution_id,
112            attempt_index,
113            attempt_id,
114            lease_id,
115            lease_epoch,
116            lease_ttl_ms,
117            lane_id,
118            worker_instance_id,
119        }
120    }
121}
122
123/// Output of [`decode`] — the decoded [`HandlePayload`] plus the
124/// [`BackendTag`] embedded in the opaque buffer (or inferred from the
125/// v1 compat path).
126#[derive(Clone, Debug, PartialEq, Eq)]
127#[non_exhaustive]
128pub struct DecodedHandle {
129    pub tag: BackendTag,
130    pub payload: HandlePayload,
131}
132
133impl DecodedHandle {
134    pub fn new(tag: BackendTag, payload: HandlePayload) -> Self {
135        Self { tag, payload }
136    }
137}
138
139/// Typed decode-failure classification. Mapped to
140/// `EngineError::Validation { kind: Corruption, .. }` at the backend
141/// boundary via the [`From`] impl.
142#[derive(Debug, Clone, PartialEq, Eq)]
143#[non_exhaustive]
144pub enum HandleDecodeError {
145    /// Buffer shorter than required by the current read position.
146    Truncated { needed: usize, at: usize, have: usize },
147    /// Trailing bytes after the last expected field.
148    TrailingBytes { pos: usize, len: usize },
149    /// v2 wire-version byte did not match a supported version.
150    BadWireVersion { got: u8 },
151    /// v1-path version byte did not match the expected tag.
152    BadV1Version { got: u8 },
153    /// v2 backend-tag byte did not map to a known [`BackendTag`].
154    BadBackendTag { got: u8 },
155    /// A length-prefixed string had invalid UTF-8.
156    InvalidUtf8 { field: &'static str },
157    /// A parsed id/field failed its own validation (e.g. `ExecutionId::parse`).
158    ParseField { field: &'static str, detail: String },
159}
160
161impl std::fmt::Display for HandleDecodeError {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        match self {
164            HandleDecodeError::Truncated { needed, at, have } => write!(
165                f,
166                "truncated handle: needed {needed} bytes at offset {at}, have {have}"
167            ),
168            HandleDecodeError::TrailingBytes { pos, len } => {
169                write!(f, "trailing bytes in handle: pos={pos}, len={len}")
170            }
171            HandleDecodeError::BadWireVersion { got } => {
172                write!(f, "handle v2 wire-version byte {got:#x} not recognised")
173            }
174            HandleDecodeError::BadV1Version { got } => write!(
175                f,
176                "handle v1 version byte {got:#x} not recognised (expected {V1_VERSION_TAG:#x})"
177            ),
178            HandleDecodeError::BadBackendTag { got } => {
179                write!(f, "handle backend-tag byte {got:#x} not recognised")
180            }
181            HandleDecodeError::InvalidUtf8 { field } => {
182                write!(f, "handle field `{field}` is not valid UTF-8")
183            }
184            HandleDecodeError::ParseField { field, detail } => {
185                write!(f, "handle field `{field}` parse failed: {detail}")
186            }
187        }
188    }
189}
190
191impl std::error::Error for HandleDecodeError {}
192
193impl From<HandleDecodeError> for EngineError {
194    fn from(err: HandleDecodeError) -> Self {
195        EngineError::Validation {
196            kind: ValidationKind::Corruption,
197            detail: format!("handle_codec: {err}"),
198        }
199    }
200}
201
202/// Encode a [`HandlePayload`] into a [`HandleOpaque`] tagged with
203/// `tag`. Produces a v2 buffer — callers decoding via [`decode`] always
204/// get the tag they encoded.
205pub fn encode(tag: BackendTag, payload: &HandlePayload) -> HandleOpaque {
206    let mut buf: Vec<u8> = Vec::with_capacity(256);
207    buf.push(V2_MAGIC);
208    buf.push(V2_WIRE_VERSION);
209    buf.push(tag.wire_byte());
210    write_fields(&mut buf, payload);
211    HandleOpaque::new(buf.into_boxed_slice())
212}
213
214/// Decode a [`HandleOpaque`] into a [`DecodedHandle`]. Accepts both
215/// v2 buffers (leading `V2_MAGIC`) and legacy v1 Valkey buffers
216/// (leading `V1_VERSION_TAG`); the v1 path returns
217/// `tag = BackendTag::Valkey`.
218pub fn decode(opaque: &HandleOpaque) -> Result<DecodedHandle, HandleDecodeError> {
219    let bytes = opaque.as_bytes();
220    let mut cur = Cursor::new(bytes);
221    let lead = cur.read_u8()?;
222    let tag = match lead {
223        V2_MAGIC => {
224            let wire_version = cur.read_u8()?;
225            if wire_version != V2_WIRE_VERSION {
226                return Err(HandleDecodeError::BadWireVersion { got: wire_version });
227            }
228            let tag_byte = cur.read_u8()?;
229            BackendTag::from_wire_byte(tag_byte)
230                .ok_or(HandleDecodeError::BadBackendTag { got: tag_byte })?
231        }
232        V1_VERSION_TAG => BackendTag::Valkey,
233        other => return Err(HandleDecodeError::BadV1Version { got: other }),
234    };
235    let payload = read_fields(&mut cur)?;
236    cur.expect_eof()?;
237    Ok(DecodedHandle { tag, payload })
238}
239
240/// Produce a pre-Wave-1c (v1) byte buffer for the given payload.
241///
242/// **Test-only**: gated behind the `test-fixtures` feature so it
243/// cannot leak into production builds. The returned bytes are the
244/// exact on-wire shape that the pre-Wave-1c Valkey-only encoder
245/// produced — a single `V1_VERSION_TAG` (`0x01`) leading byte followed
246/// by the field block (see module docs §Fields). The buffer decodes
247/// via [`decode`] through the v1 compat path, yielding
248/// `tag = BackendTag::Valkey`.
249///
250/// Used by downstream consumers (e.g. cairn event-log migration tests,
251/// issue #323) to verify that persistent handles written under FF 0.3
252/// still decode cleanly under FF 0.9+.
253///
254/// There is intentionally no `decode` counterpart — v1 parsing goes
255/// through [`decode`]; this function only synthesises input.
256#[cfg(feature = "test-fixtures")]
257pub fn v1_handle_for_tests(payload: &HandlePayload) -> Vec<u8> {
258    let mut buf: Vec<u8> = Vec::with_capacity(256);
259    buf.push(V1_VERSION_TAG);
260    write_fields(&mut buf, payload);
261    buf
262}
263
264// ── Field serialization ─────────────────────────────────────────────────
265
266fn write_fields(buf: &mut Vec<u8>, f: &HandlePayload) {
267    write_str(buf, &f.execution_id.to_string());
268    buf.extend_from_slice(&f.attempt_index.0.to_le_bytes());
269    write_str(buf, &f.attempt_id.to_string());
270    write_str(buf, &f.lease_id.to_string());
271    buf.extend_from_slice(&f.lease_epoch.0.to_le_bytes());
272    buf.extend_from_slice(&f.lease_ttl_ms.to_le_bytes());
273    write_str(buf, f.lane_id.as_str());
274    write_str(buf, f.worker_instance_id.as_str());
275}
276
277fn read_fields(cur: &mut Cursor<'_>) -> Result<HandlePayload, HandleDecodeError> {
278    let execution_id_str = cur.read_str("execution_id")?;
279    let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
280        HandleDecodeError::ParseField {
281            field: "execution_id",
282            detail: e.to_string(),
283        }
284    })?;
285    let attempt_index = AttemptIndex::new(cur.read_u32()?);
286    let attempt_id_str = cur.read_str("attempt_id")?;
287    let attempt_id =
288        AttemptId::parse(&attempt_id_str).map_err(|e| HandleDecodeError::ParseField {
289            field: "attempt_id",
290            detail: e.to_string(),
291        })?;
292    let lease_id_str = cur.read_str("lease_id")?;
293    let lease_id = LeaseId::parse(&lease_id_str).map_err(|e| HandleDecodeError::ParseField {
294        field: "lease_id",
295        detail: e.to_string(),
296    })?;
297    let lease_epoch = LeaseEpoch(cur.read_u64()?);
298    let lease_ttl_ms = cur.read_u64()?;
299    let lane_id_str = cur.read_str("lane_id")?;
300    let lane_id = LaneId::new(lane_id_str);
301    let worker_str = cur.read_str("worker_instance_id")?;
302    let worker_instance_id = WorkerInstanceId::new(worker_str);
303    Ok(HandlePayload {
304        execution_id,
305        attempt_index,
306        attempt_id,
307        lease_id,
308        lease_epoch,
309        lease_ttl_ms,
310        lane_id,
311        worker_instance_id,
312    })
313}
314
315fn write_str(buf: &mut Vec<u8>, s: &str) {
316    let bytes = s.as_bytes();
317    // Clamp rather than panic: if a caller somehow hands in a
318    // >4 GiB string (impossible at the SDK's attempt-cookie scale),
319    // we write a u32::MAX length prefix + the first 4 GiB of bytes.
320    // decode() will detect the trailing truncation.
321    let (len, take) = match u32::try_from(bytes.len()) {
322        Ok(n) => (n, bytes.len()),
323        Err(_) => (u32::MAX, u32::MAX as usize),
324    };
325    buf.extend_from_slice(&len.to_le_bytes());
326    buf.extend_from_slice(&bytes[..take]);
327}
328
329struct Cursor<'a> {
330    bytes: &'a [u8],
331    pos: usize,
332}
333
334impl<'a> Cursor<'a> {
335    fn new(bytes: &'a [u8]) -> Self {
336        Self { bytes, pos: 0 }
337    }
338
339    fn take(&mut self, n: usize) -> Result<&'a [u8], HandleDecodeError> {
340        if self.pos + n > self.bytes.len() {
341            return Err(HandleDecodeError::Truncated {
342                needed: n,
343                at: self.pos,
344                have: self.bytes.len(),
345            });
346        }
347        let slice = &self.bytes[self.pos..self.pos + n];
348        self.pos += n;
349        Ok(slice)
350    }
351
352    fn read_u8(&mut self) -> Result<u8, HandleDecodeError> {
353        Ok(self.take(1)?[0])
354    }
355
356    fn read_u32(&mut self) -> Result<u32, HandleDecodeError> {
357        let mut b = [0u8; 4];
358        b.copy_from_slice(self.take(4)?);
359        Ok(u32::from_le_bytes(b))
360    }
361
362    fn read_u64(&mut self) -> Result<u64, HandleDecodeError> {
363        let mut b = [0u8; 8];
364        b.copy_from_slice(self.take(8)?);
365        Ok(u64::from_le_bytes(b))
366    }
367
368    fn read_str(&mut self, field: &'static str) -> Result<String, HandleDecodeError> {
369        let len = self.read_u32()? as usize;
370        let bytes = self.take(len)?;
371        String::from_utf8(bytes.to_vec())
372            .map_err(|_| HandleDecodeError::InvalidUtf8 { field })
373    }
374
375    fn expect_eof(&self) -> Result<(), HandleDecodeError> {
376        if self.pos != self.bytes.len() {
377            return Err(HandleDecodeError::TrailingBytes {
378                pos: self.pos,
379                len: self.bytes.len(),
380            });
381        }
382        Ok(())
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use crate::partition::PartitionConfig;
390
391    fn sample_payload() -> HandlePayload {
392        HandlePayload {
393            execution_id: ExecutionId::solo(
394                &LaneId::new("default"),
395                &PartitionConfig::default(),
396            ),
397            attempt_index: AttemptIndex::new(3),
398            attempt_id: AttemptId::new(),
399            lease_id: LeaseId::new(),
400            lease_epoch: LeaseEpoch(7),
401            lease_ttl_ms: 30_000,
402            lane_id: LaneId::new("default"),
403            worker_instance_id: WorkerInstanceId::new("worker-1"),
404        }
405    }
406
407    #[test]
408    fn round_trip_valkey_v2() {
409        let p = sample_payload();
410        let opaque = encode(BackendTag::Valkey, &p);
411        // v2 leading byte.
412        assert_eq!(opaque.as_bytes()[0], V2_MAGIC);
413        assert_eq!(opaque.as_bytes()[1], V2_WIRE_VERSION);
414        assert_eq!(opaque.as_bytes()[2], BackendTag::Valkey.wire_byte());
415        let decoded = decode(&opaque).expect("round-trip");
416        assert_eq!(decoded.tag, BackendTag::Valkey);
417        assert_eq!(decoded.payload, p);
418    }
419
420    #[test]
421    fn round_trip_postgres_v2() {
422        let p = sample_payload();
423        let opaque = encode(BackendTag::Postgres, &p);
424        assert_eq!(opaque.as_bytes()[2], BackendTag::Postgres.wire_byte());
425        let decoded = decode(&opaque).expect("round-trip");
426        assert_eq!(decoded.tag, BackendTag::Postgres);
427        assert_eq!(decoded.payload, p);
428    }
429
430    /// RFC-023 Phase 2a.1.5 — `BackendTag::Sqlite` minted handles
431    /// round-trip through the core codec with wire byte `0x03`.
432    #[test]
433    fn round_trip_sqlite_v2() {
434        let p = sample_payload();
435        let opaque = encode(BackendTag::Sqlite, &p);
436        assert_eq!(opaque.as_bytes()[2], BackendTag::Sqlite.wire_byte());
437        let decoded = decode(&opaque).expect("round-trip");
438        assert_eq!(decoded.tag, BackendTag::Sqlite);
439        assert_eq!(decoded.payload, p);
440    }
441
442    #[test]
443    fn wire_byte_sqlite() {
444        assert_eq!(BackendTag::Sqlite.wire_byte(), 0x03);
445    }
446
447    #[test]
448    fn from_wire_byte_sqlite() {
449        assert_eq!(BackendTag::from_wire_byte(0x03), Some(BackendTag::Sqlite));
450    }
451
452    /// Regression guard: an opaque buffer produced by the pre-Wave-1c
453    /// Valkey-only codec (leading byte = 0x01, no magic, no embedded
454    /// backend tag) must still decode cleanly as `BackendTag::Valkey`.
455    /// Non-negotiable per Wave 1c spec.
456    #[test]
457    fn old_v1_format_decodes_as_valkey() {
458        let p = sample_payload();
459        // Synthesise a v1 buffer byte-for-byte identical to what the
460        // pre-Wave-1c `encode_handle` in ff-backend-valkey produced.
461        let mut buf: Vec<u8> = Vec::new();
462        buf.push(V1_VERSION_TAG); // single version byte, no magic, no tag
463        write_fields(&mut buf, &p);
464        let opaque = HandleOpaque::new(buf.into_boxed_slice());
465        let decoded = decode(&opaque).expect("v1 compat decode");
466        assert_eq!(decoded.tag, BackendTag::Valkey);
467        assert_eq!(decoded.payload, p);
468    }
469
470    #[test]
471    fn truncated_handle_rejected() {
472        // Only the magic byte — read_u8 for wire_version will fail.
473        let opaque = HandleOpaque::new(Box::new([V2_MAGIC]));
474        let err = decode(&opaque).unwrap_err();
475        assert!(matches!(err, HandleDecodeError::Truncated { .. }));
476    }
477
478    #[test]
479    fn bad_v2_wire_version_rejected() {
480        // v2 magic, unknown wire version.
481        let opaque = HandleOpaque::new(Box::new([V2_MAGIC, 0xFF, 0x01]));
482        let err = decode(&opaque).unwrap_err();
483        assert!(matches!(err, HandleDecodeError::BadWireVersion { got: 0xFF }));
484    }
485
486    #[test]
487    fn bad_backend_tag_rejected() {
488        // v2 magic + valid wire version + bogus tag byte.
489        let opaque = HandleOpaque::new(Box::new([V2_MAGIC, V2_WIRE_VERSION, 0xFE]));
490        let err = decode(&opaque).unwrap_err();
491        assert!(matches!(err, HandleDecodeError::BadBackendTag { got: 0xFE }));
492    }
493
494    #[test]
495    fn bad_leading_byte_rejected() {
496        // Neither v2 magic nor v1 version tag.
497        let opaque = HandleOpaque::new(Box::new([0xAB]));
498        let err = decode(&opaque).unwrap_err();
499        assert!(matches!(err, HandleDecodeError::BadV1Version { got: 0xAB }));
500    }
501
502    /// Issue #323: consumers building cross-version compat tests use
503    /// `v1_handle_for_tests` to synthesise a v1 byte buffer, then
504    /// round-trip it through `HandleCodec::decode` to verify the v1
505    /// compat path still accepts 0.3-era persisted handles. The
506    /// fixture must produce bytes byte-identical to what the hand-
507    /// rolled v1 buffer in `old_v1_format_decodes_as_valkey` produces.
508    #[cfg(feature = "test-fixtures")]
509    #[test]
510    fn v1_handle_for_tests_round_trip() {
511        let p = sample_payload();
512        let v1_bytes = super::v1_handle_for_tests(&p);
513        // Leading byte is v1 version tag (no v2 magic).
514        assert_eq!(v1_bytes[0], V1_VERSION_TAG);
515        // Byte-for-byte identical to the reference v1 synthesiser.
516        let mut expected: Vec<u8> = Vec::new();
517        expected.push(V1_VERSION_TAG);
518        write_fields(&mut expected, &p);
519        assert_eq!(v1_bytes, expected);
520        // Decodes via the v2 compat path → Valkey + original payload.
521        let opaque = HandleOpaque::new(v1_bytes.into_boxed_slice());
522        let decoded = decode(&opaque).expect("v1 fixture decodes");
523        assert_eq!(decoded.tag, BackendTag::Valkey);
524        assert_eq!(decoded.payload, p);
525    }
526
527    #[test]
528    fn decode_error_maps_to_validation_corruption() {
529        let err: EngineError = HandleDecodeError::Truncated {
530            needed: 4,
531            at: 1,
532            have: 1,
533        }
534        .into();
535        match err {
536            EngineError::Validation { kind, detail } => {
537                assert_eq!(kind, ValidationKind::Corruption);
538                assert!(detail.contains("handle_codec"));
539            }
540            other => panic!("expected Validation, got {other:?}"),
541        }
542    }
543}