ff_backend_postgres/
handle_codec.rs1use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque};
16use ff_core::engine_error::{EngineError, ValidationKind};
17use ff_core::handle_codec::{decode as core_decode, encode as core_encode, HandlePayload};
18
19#[allow(dead_code)] pub(crate) fn encode_handle(payload: &HandlePayload, kind: HandleKind) -> Handle {
22 let opaque: HandleOpaque = core_encode(BackendTag::Postgres, payload);
23 Handle::new(BackendTag::Postgres, kind, opaque)
24}
25
26#[allow(dead_code)] pub(crate) fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
31 if handle.backend != BackendTag::Postgres {
32 return Err(EngineError::Validation {
33 kind: ValidationKind::HandleFromOtherBackend,
34 detail: format!(
35 "expected={:?} actual={:?}",
36 BackendTag::Postgres,
37 handle.backend
38 ),
39 });
40 }
41 let decoded = core_decode(&handle.opaque)?;
42 if decoded.tag != BackendTag::Postgres {
43 return Err(EngineError::Validation {
44 kind: ValidationKind::HandleFromOtherBackend,
45 detail: format!(
46 "expected={:?} actual={:?} (embedded tag)",
47 BackendTag::Postgres,
48 decoded.tag
49 ),
50 });
51 }
52 Ok(decoded.payload)
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use ff_core::partition::PartitionConfig;
59 use ff_core::types::{
60 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, WorkerInstanceId,
61 };
62
63 fn sample_payload() -> HandlePayload {
64 HandlePayload::new(
65 ExecutionId::solo(&LaneId::new("default"), &PartitionConfig::default()),
66 AttemptIndex::new(1),
67 AttemptId::new(),
68 LeaseId::new(),
69 LeaseEpoch(1),
70 30_000,
71 LaneId::new("default"),
72 WorkerInstanceId::new("pg-worker-1"),
73 )
74 }
75
76 #[test]
77 fn round_trip_fresh() {
78 let p = sample_payload();
79 let h = encode_handle(&p, HandleKind::Fresh);
80 assert_eq!(h.backend, BackendTag::Postgres);
81 let back = decode_handle(&h).expect("round-trip");
82 assert_eq!(back, p);
83 }
84
85 #[test]
89 fn valkey_handle_rejected_as_other_backend() {
90 let p = sample_payload();
91 let valkey_opaque = ff_core::handle_codec::encode(BackendTag::Valkey, &p);
92 let handle = Handle::new(BackendTag::Valkey, HandleKind::Fresh, valkey_opaque);
93 let err = decode_handle(&handle).unwrap_err();
94 match err {
95 EngineError::Validation { kind, .. } => {
96 assert_eq!(kind, ValidationKind::HandleFromOtherBackend);
97 }
98 other => panic!("expected Validation, got {other:?}"),
99 }
100 }
101}