Skip to main content

liminal/aion/
codec.rs

1use serde_json::{Value, json};
2
3use super::types::{ActivityRequest, ActivityResult, Payload};
4use crate::channel::{Schema, SchemaValidationError};
5
6/// Content type for activity dispatch request messages.
7pub const DISPATCH_REQUEST_CONTENT_TYPE: &str = "application/vnd.aion.dispatch.request+json";
8/// Content type for activity dispatch response messages.
9pub const DISPATCH_RESPONSE_CONTENT_TYPE: &str = "application/vnd.aion.dispatch.response+json";
10/// Content type for activity dispatch acknowledgement messages.
11pub const DISPATCH_ACK_CONTENT_TYPE: &str = "application/vnd.aion.dispatch.ack+json";
12
13/// Typed request sent through an activity dispatch conversation.
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct DispatchRequest {
16    /// Conversation correlation identifier.
17    pub conversation_id: String,
18    /// Activity request scheduled by the workflow.
19    pub request: ActivityRequest,
20}
21
22impl DispatchRequest {
23    /// Content type expected by typed dispatch request channels.
24    pub const CONTENT_TYPE: &'static str = DISPATCH_REQUEST_CONTENT_TYPE;
25
26    /// Creates a typed dispatch request.
27    #[must_use]
28    pub const fn new(conversation_id: String, request: ActivityRequest) -> Self {
29        Self {
30            conversation_id,
31            request,
32        }
33    }
34}
35
36/// Typed response returned by a worker through an activity dispatch conversation.
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct DispatchResponse {
39    /// Worker that produced the response.
40    pub worker_id: String,
41    /// Activity result returned by the worker.
42    pub result: ActivityResult,
43}
44
45impl DispatchResponse {
46    /// Content type expected by typed dispatch response channels.
47    pub const CONTENT_TYPE: &'static str = DISPATCH_RESPONSE_CONTENT_TYPE;
48
49    /// Creates a typed dispatch response.
50    #[must_use]
51    pub const fn new(worker_id: String, result: ActivityResult) -> Self {
52        Self { worker_id, result }
53    }
54}
55
56/// Delivery acknowledgement status for dispatch request publication.
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub enum DispatchAckStatus {
59    /// Request was accepted for delivery.
60    Accepted,
61    /// Request was deferred by backpressure and remains buffered.
62    Deferred,
63    /// Request was rejected by backpressure or validation.
64    Rejected,
65}
66
67impl DispatchAckStatus {
68    /// Returns the stable wire string for the acknowledgement status.
69    #[must_use]
70    pub const fn as_str(self) -> &'static str {
71        match self {
72            Self::Accepted => "accepted",
73            Self::Deferred => "deferred",
74            Self::Rejected => "rejected",
75        }
76    }
77}
78
79/// Typed acknowledgement for a dispatch request.
80#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct DispatchAck {
82    /// Conversation correlation identifier.
83    pub conversation_id: String,
84    /// Delivery acknowledgement status.
85    pub status: DispatchAckStatus,
86}
87
88impl DispatchAck {
89    /// Content type expected by typed dispatch acknowledgement channels.
90    pub const CONTENT_TYPE: &'static str = DISPATCH_ACK_CONTENT_TYPE;
91
92    /// Creates a typed dispatch acknowledgement.
93    #[must_use]
94    pub const fn new(conversation_id: String, status: DispatchAckStatus) -> Self {
95        Self {
96            conversation_id,
97            status,
98        }
99    }
100}
101
102/// Builds the JSON Schema used to validate dispatch request payloads.
103///
104/// # Errors
105///
106/// Returns [`SchemaValidationError`] if the schema definition cannot be compiled.
107pub fn dispatch_request_schema() -> Result<Schema, SchemaValidationError> {
108    typed_schema(
109        DISPATCH_REQUEST_CONTENT_TYPE,
110        &json!({
111            "conversation_id": {"type": "string", "minLength": 1},
112            "request": {"type": "object"},
113            "worker_id": false,
114            "result": false,
115            "status": false
116        }),
117        &["conversation_id", "request"],
118    )
119}
120
121/// Builds the JSON Schema used to validate dispatch response payloads.
122///
123/// # Errors
124///
125/// Returns [`SchemaValidationError`] if the schema definition cannot be compiled.
126pub fn dispatch_response_schema() -> Result<Schema, SchemaValidationError> {
127    typed_schema(
128        DISPATCH_RESPONSE_CONTENT_TYPE,
129        &json!({
130            "conversation_id": false,
131            "request": false,
132            "worker_id": {"type": "string", "minLength": 1},
133            "result": {"type": "object"},
134            "status": false
135        }),
136        &["worker_id", "result"],
137    )
138}
139
140/// Builds the JSON Schema used to validate dispatch acknowledgement payloads.
141///
142/// # Errors
143///
144/// Returns [`SchemaValidationError`] if the schema definition cannot be compiled.
145pub fn dispatch_ack_schema() -> Result<Schema, SchemaValidationError> {
146    typed_schema(
147        DISPATCH_ACK_CONTENT_TYPE,
148        &json!({
149            "conversation_id": {"type": "string", "minLength": 1},
150            "request": false,
151            "worker_id": false,
152            "result": false,
153            "status": {"enum": ["accepted", "deferred", "rejected"]}
154        }),
155        &["conversation_id", "status"],
156    )
157}
158
159/// Encodes a typed request as JSON bytes for external producers.
160///
161/// # Errors
162///
163/// Returns [`serde_json::Error`] if the JSON payload cannot be serialized.
164pub fn encode_dispatch_request(message: &DispatchRequest) -> Result<Vec<u8>, serde_json::Error> {
165    serde_json::to_vec(&json!({
166        "content_type": DispatchRequest::CONTENT_TYPE,
167        "conversation_id": message.conversation_id,
168        "request": activity_request_value(&message.request)
169    }))
170}
171
172/// Encodes a typed response as JSON bytes for external producers.
173///
174/// # Errors
175///
176/// Returns [`serde_json::Error`] if the JSON payload cannot be serialized.
177pub fn encode_dispatch_response(message: &DispatchResponse) -> Result<Vec<u8>, serde_json::Error> {
178    serde_json::to_vec(&json!({
179        "content_type": DispatchResponse::CONTENT_TYPE,
180        "worker_id": message.worker_id,
181        "result": activity_result_value(&message.result)
182    }))
183}
184
185/// Encodes a typed acknowledgement as JSON bytes for external producers.
186///
187/// # Errors
188///
189/// Returns [`serde_json::Error`] if the JSON payload cannot be serialized.
190pub fn encode_dispatch_ack(message: &DispatchAck) -> Result<Vec<u8>, serde_json::Error> {
191    serde_json::to_vec(&json!({
192        "content_type": DispatchAck::CONTENT_TYPE,
193        "conversation_id": message.conversation_id,
194        "status": message.status.as_str()
195    }))
196}
197
198fn typed_schema(
199    content_type: &str,
200    properties: &Value,
201    required: &[&str],
202) -> Result<Schema, SchemaValidationError> {
203    let mut required_fields = vec!["content_type"];
204    required_fields.extend_from_slice(required);
205
206    Schema::new(json!({
207        "type": "object",
208        "properties": {
209            "content_type": {"const": content_type}
210        },
211        "required": required_fields,
212        "allOf": [{
213            "type": "object",
214            "properties": properties
215        }]
216    }))
217}
218
219fn activity_request_value(request: &ActivityRequest) -> Value {
220    json!({
221        "activity_type": request.activity_type,
222        "input": payload_value(&request.input),
223        "task_queue": request.task_queue,
224        "schedule_to_close_timeout_ms": request
225            .schedule_to_close_timeout
226            .map(|timeout| timeout.as_millis()),
227        "start_to_close_timeout_ms": request
228            .start_to_close_timeout
229            .map(|timeout| timeout.as_millis())
230    })
231}
232
233fn activity_result_value(result: &ActivityResult) -> Value {
234    match result {
235        ActivityResult::Completed { output } => json!({
236            "status": "completed",
237            "output": payload_value(output)
238        }),
239        ActivityResult::Failed { error } => json!({
240            "status": "failed",
241            "error": error.to_string()
242        }),
243    }
244}
245
246fn payload_value(payload: &Payload) -> Value {
247    json!({
248        "content_type": payload.content_type,
249        "data": payload.data
250    })
251}
252
253#[cfg(test)]
254mod tests {
255    use std::error::Error;
256
257    use super::{
258        DISPATCH_ACK_CONTENT_TYPE, DISPATCH_REQUEST_CONTENT_TYPE, DispatchAck, DispatchAckStatus,
259        DispatchRequest, DispatchResponse, dispatch_request_schema, dispatch_response_schema,
260        encode_dispatch_ack, encode_dispatch_request, encode_dispatch_response,
261    };
262    use crate::aion::{ActivityRequest, Payload, dispatch_channel};
263    use crate::channel::{ChannelConfig, ChannelHandle, ChannelMode};
264    use crate::error::LiminalError;
265
266    fn activity_request() -> ActivityRequest {
267        ActivityRequest {
268            activity_type: "send-email".to_owned(),
269            input: Payload {
270                data: b"{}".to_vec(),
271                content_type: "application/json".to_owned(),
272            },
273            task_queue: "email".to_owned(),
274            schedule_to_close_timeout: None,
275            start_to_close_timeout: None,
276        }
277    }
278
279    #[test]
280    fn typed_dispatch_messages_carry_content_types() {
281        assert_eq!(DispatchRequest::CONTENT_TYPE, DISPATCH_REQUEST_CONTENT_TYPE);
282        assert_eq!(DispatchAck::CONTENT_TYPE, DISPATCH_ACK_CONTENT_TYPE);
283        assert_eq!(DispatchAckStatus::Accepted.as_str(), "accepted");
284    }
285
286    #[test]
287    fn request_schema_accepts_matching_content_type() -> Result<(), Box<dyn Error>> {
288        let message = DispatchRequest::new("conversation-1".to_owned(), activity_request());
289        let schema = dispatch_request_schema()?;
290        let payload = encode_dispatch_request(&message)?;
291
292        schema.validate(payload)?;
293        Ok(())
294    }
295
296    #[test]
297    fn channel_rejects_mismatched_dispatch_content_type() -> Result<(), Box<dyn Error>> {
298        let channel_name = dispatch_channel("prod", "email")?;
299        let config = ChannelConfig::new(
300            String::from(channel_name),
301            dispatch_request_schema()?,
302            ChannelMode::Ephemeral,
303        );
304        let channel = ChannelHandle::new(config);
305        let rejected = channel
306            .publish(br#"{"content_type":"application/json","conversation_id":"c1","request":{}}"#);
307
308        assert!(matches!(rejected, Err(LiminalError::SchemaMismatch { .. })));
309        Ok(())
310    }
311
312    #[test]
313    fn acknowledgement_encoding_uses_status_string() -> Result<(), Box<dyn Error>> {
314        let ack = DispatchAck::new("conversation-1".to_owned(), DispatchAckStatus::Deferred);
315        let encoded = encode_dispatch_ack(&ack)?;
316        let payload = std::str::from_utf8(&encoded)?;
317
318        assert!(payload.contains("deferred"));
319        Ok(())
320    }
321
322    #[test]
323    fn request_schema_rejects_response_shape_even_with_request_content_type()
324    -> Result<(), Box<dyn Error>> {
325        let schema = dispatch_request_schema()?;
326        let response = DispatchResponse::new(
327            "worker-1".to_owned(),
328            crate::aion::ActivityResult::Completed {
329                output: Payload {
330                    data: b"ok".to_vec(),
331                    content_type: "application/octet-stream".to_owned(),
332                },
333            },
334        );
335        let mut payload: serde_json::Value =
336            serde_json::from_slice(&encode_dispatch_response(&response)?)?;
337        payload["content_type"] =
338            serde_json::Value::String(DispatchRequest::CONTENT_TYPE.to_owned());
339
340        let result = schema.validate(serde_json::to_vec(&payload)?);
341
342        assert!(matches!(
343            result,
344            Err(crate::channel::SchemaValidationError::Mismatch { .. })
345        ));
346        Ok(())
347    }
348
349    #[test]
350    fn response_schema_rejects_request_shape_even_with_response_content_type()
351    -> Result<(), Box<dyn Error>> {
352        let schema = dispatch_response_schema()?;
353        let request = DispatchRequest::new("conversation-1".to_owned(), activity_request());
354        let mut payload: serde_json::Value =
355            serde_json::from_slice(&encode_dispatch_request(&request)?)?;
356        payload["content_type"] =
357            serde_json::Value::String(DispatchResponse::CONTENT_TYPE.to_owned());
358
359        let result = schema.validate(serde_json::to_vec(&payload)?);
360
361        assert!(matches!(
362            result,
363            Err(crate::channel::SchemaValidationError::Mismatch { .. })
364        ));
365        Ok(())
366    }
367}