1use serde_json::{Value, json};
2
3use super::types::{ActivityRequest, ActivityResult, Payload};
4use crate::channel::{Schema, SchemaValidationError};
5
6pub const DISPATCH_REQUEST_CONTENT_TYPE: &str = "application/vnd.aion.dispatch.request+json";
8pub const DISPATCH_RESPONSE_CONTENT_TYPE: &str = "application/vnd.aion.dispatch.response+json";
10pub const DISPATCH_ACK_CONTENT_TYPE: &str = "application/vnd.aion.dispatch.ack+json";
12
13#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct DispatchRequest {
16 pub conversation_id: String,
18 pub request: ActivityRequest,
20}
21
22impl DispatchRequest {
23 pub const CONTENT_TYPE: &'static str = DISPATCH_REQUEST_CONTENT_TYPE;
25
26 #[must_use]
28 pub const fn new(conversation_id: String, request: ActivityRequest) -> Self {
29 Self {
30 conversation_id,
31 request,
32 }
33 }
34}
35
36#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct DispatchResponse {
39 pub worker_id: String,
41 pub result: ActivityResult,
43}
44
45impl DispatchResponse {
46 pub const CONTENT_TYPE: &'static str = DISPATCH_RESPONSE_CONTENT_TYPE;
48
49 #[must_use]
51 pub const fn new(worker_id: String, result: ActivityResult) -> Self {
52 Self { worker_id, result }
53 }
54}
55
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub enum DispatchAckStatus {
59 Accepted,
61 Deferred,
63 Rejected,
65}
66
67impl DispatchAckStatus {
68 #[must_use]
70 pub const fn as_str(self) -> &'static str {
71 match self {
72 Self::Accepted => "accepted",
73 Self::Deferred => "deferred",
74 Self::Rejected => "rejected",
75 }
76 }
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct DispatchAck {
82 pub conversation_id: String,
84 pub status: DispatchAckStatus,
86}
87
88impl DispatchAck {
89 pub const CONTENT_TYPE: &'static str = DISPATCH_ACK_CONTENT_TYPE;
91
92 #[must_use]
94 pub const fn new(conversation_id: String, status: DispatchAckStatus) -> Self {
95 Self {
96 conversation_id,
97 status,
98 }
99 }
100}
101
102pub fn dispatch_request_schema() -> Result<Schema, SchemaValidationError> {
108 typed_schema(
109 DISPATCH_REQUEST_CONTENT_TYPE,
110 &json!({
111 "conversation_id": {"type": "string", "minLength": 1},
112 "request": {"type": "object"},
113 "worker_id": false,
114 "result": false,
115 "status": false
116 }),
117 &["conversation_id", "request"],
118 )
119}
120
121pub fn dispatch_response_schema() -> Result<Schema, SchemaValidationError> {
127 typed_schema(
128 DISPATCH_RESPONSE_CONTENT_TYPE,
129 &json!({
130 "conversation_id": false,
131 "request": false,
132 "worker_id": {"type": "string", "minLength": 1},
133 "result": {"type": "object"},
134 "status": false
135 }),
136 &["worker_id", "result"],
137 )
138}
139
140pub fn dispatch_ack_schema() -> Result<Schema, SchemaValidationError> {
146 typed_schema(
147 DISPATCH_ACK_CONTENT_TYPE,
148 &json!({
149 "conversation_id": {"type": "string", "minLength": 1},
150 "request": false,
151 "worker_id": false,
152 "result": false,
153 "status": {"enum": ["accepted", "deferred", "rejected"]}
154 }),
155 &["conversation_id", "status"],
156 )
157}
158
159pub fn encode_dispatch_request(message: &DispatchRequest) -> Result<Vec<u8>, serde_json::Error> {
165 serde_json::to_vec(&json!({
166 "content_type": DispatchRequest::CONTENT_TYPE,
167 "conversation_id": message.conversation_id,
168 "request": activity_request_value(&message.request)
169 }))
170}
171
172pub fn encode_dispatch_response(message: &DispatchResponse) -> Result<Vec<u8>, serde_json::Error> {
178 serde_json::to_vec(&json!({
179 "content_type": DispatchResponse::CONTENT_TYPE,
180 "worker_id": message.worker_id,
181 "result": activity_result_value(&message.result)
182 }))
183}
184
185pub fn encode_dispatch_ack(message: &DispatchAck) -> Result<Vec<u8>, serde_json::Error> {
191 serde_json::to_vec(&json!({
192 "content_type": DispatchAck::CONTENT_TYPE,
193 "conversation_id": message.conversation_id,
194 "status": message.status.as_str()
195 }))
196}
197
198fn typed_schema(
199 content_type: &str,
200 properties: &Value,
201 required: &[&str],
202) -> Result<Schema, SchemaValidationError> {
203 let mut required_fields = vec!["content_type"];
204 required_fields.extend_from_slice(required);
205
206 Schema::new(json!({
207 "type": "object",
208 "properties": {
209 "content_type": {"const": content_type}
210 },
211 "required": required_fields,
212 "allOf": [{
213 "type": "object",
214 "properties": properties
215 }]
216 }))
217}
218
219fn activity_request_value(request: &ActivityRequest) -> Value {
220 json!({
221 "activity_type": request.activity_type,
222 "input": payload_value(&request.input),
223 "task_queue": request.task_queue,
224 "schedule_to_close_timeout_ms": request
225 .schedule_to_close_timeout
226 .map(|timeout| timeout.as_millis()),
227 "start_to_close_timeout_ms": request
228 .start_to_close_timeout
229 .map(|timeout| timeout.as_millis())
230 })
231}
232
233fn activity_result_value(result: &ActivityResult) -> Value {
234 match result {
235 ActivityResult::Completed { output } => json!({
236 "status": "completed",
237 "output": payload_value(output)
238 }),
239 ActivityResult::Failed { error } => json!({
240 "status": "failed",
241 "error": error.to_string()
242 }),
243 }
244}
245
246fn payload_value(payload: &Payload) -> Value {
247 json!({
248 "content_type": payload.content_type,
249 "data": payload.data
250 })
251}
252
253#[cfg(test)]
254mod tests {
255 use std::error::Error;
256
257 use super::{
258 DISPATCH_ACK_CONTENT_TYPE, DISPATCH_REQUEST_CONTENT_TYPE, DispatchAck, DispatchAckStatus,
259 DispatchRequest, DispatchResponse, dispatch_request_schema, dispatch_response_schema,
260 encode_dispatch_ack, encode_dispatch_request, encode_dispatch_response,
261 };
262 use crate::aion::{ActivityRequest, Payload, dispatch_channel};
263 use crate::channel::{ChannelConfig, ChannelHandle, ChannelMode};
264 use crate::error::LiminalError;
265
266 fn activity_request() -> ActivityRequest {
267 ActivityRequest {
268 activity_type: "send-email".to_owned(),
269 input: Payload {
270 data: b"{}".to_vec(),
271 content_type: "application/json".to_owned(),
272 },
273 task_queue: "email".to_owned(),
274 schedule_to_close_timeout: None,
275 start_to_close_timeout: None,
276 }
277 }
278
279 #[test]
280 fn typed_dispatch_messages_carry_content_types() {
281 assert_eq!(DispatchRequest::CONTENT_TYPE, DISPATCH_REQUEST_CONTENT_TYPE);
282 assert_eq!(DispatchAck::CONTENT_TYPE, DISPATCH_ACK_CONTENT_TYPE);
283 assert_eq!(DispatchAckStatus::Accepted.as_str(), "accepted");
284 }
285
286 #[test]
287 fn request_schema_accepts_matching_content_type() -> Result<(), Box<dyn Error>> {
288 let message = DispatchRequest::new("conversation-1".to_owned(), activity_request());
289 let schema = dispatch_request_schema()?;
290 let payload = encode_dispatch_request(&message)?;
291
292 schema.validate(payload)?;
293 Ok(())
294 }
295
296 #[test]
297 fn channel_rejects_mismatched_dispatch_content_type() -> Result<(), Box<dyn Error>> {
298 let channel_name = dispatch_channel("prod", "email")?;
299 let config = ChannelConfig::new(
300 String::from(channel_name),
301 dispatch_request_schema()?,
302 ChannelMode::Ephemeral,
303 );
304 let channel = ChannelHandle::new(config);
305 let rejected = channel
306 .publish(br#"{"content_type":"application/json","conversation_id":"c1","request":{}}"#);
307
308 assert!(matches!(rejected, Err(LiminalError::SchemaMismatch { .. })));
309 Ok(())
310 }
311
312 #[test]
313 fn acknowledgement_encoding_uses_status_string() -> Result<(), Box<dyn Error>> {
314 let ack = DispatchAck::new("conversation-1".to_owned(), DispatchAckStatus::Deferred);
315 let encoded = encode_dispatch_ack(&ack)?;
316 let payload = std::str::from_utf8(&encoded)?;
317
318 assert!(payload.contains("deferred"));
319 Ok(())
320 }
321
322 #[test]
323 fn request_schema_rejects_response_shape_even_with_request_content_type()
324 -> Result<(), Box<dyn Error>> {
325 let schema = dispatch_request_schema()?;
326 let response = DispatchResponse::new(
327 "worker-1".to_owned(),
328 crate::aion::ActivityResult::Completed {
329 output: Payload {
330 data: b"ok".to_vec(),
331 content_type: "application/octet-stream".to_owned(),
332 },
333 },
334 );
335 let mut payload: serde_json::Value =
336 serde_json::from_slice(&encode_dispatch_response(&response)?)?;
337 payload["content_type"] =
338 serde_json::Value::String(DispatchRequest::CONTENT_TYPE.to_owned());
339
340 let result = schema.validate(serde_json::to_vec(&payload)?);
341
342 assert!(matches!(
343 result,
344 Err(crate::channel::SchemaValidationError::Mismatch { .. })
345 ));
346 Ok(())
347 }
348
349 #[test]
350 fn response_schema_rejects_request_shape_even_with_response_content_type()
351 -> Result<(), Box<dyn Error>> {
352 let schema = dispatch_response_schema()?;
353 let request = DispatchRequest::new("conversation-1".to_owned(), activity_request());
354 let mut payload: serde_json::Value =
355 serde_json::from_slice(&encode_dispatch_request(&request)?)?;
356 payload["content_type"] =
357 serde_json::Value::String(DispatchResponse::CONTENT_TYPE.to_owned());
358
359 let result = schema.validate(serde_json::to_vec(&payload)?);
360
361 assert!(matches!(
362 result,
363 Err(crate::channel::SchemaValidationError::Mismatch { .. })
364 ));
365 Ok(())
366 }
367}