1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::{
9 ActivityError, ActivityId, Payload, RunId, ScheduleConfig, ScheduleId, SearchAttributeValue,
10 TimerId, WorkflowError, WorkflowId,
11};
12
13#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
15pub struct EventEnvelope {
16 pub seq: u64,
18 pub recorded_at: DateTime<Utc>,
23 pub workflow_id: WorkflowId,
25}
26
27#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq)]
32#[serde(tag = "type", content = "data")]
33pub enum Event {
34 WorkflowStarted {
36 envelope: EventEnvelope,
38 workflow_type: String,
40 input: Payload,
42 run_id: RunId,
44 parent_run_id: Option<RunId>,
47 },
48 WorkflowCompleted {
50 envelope: EventEnvelope,
52 result: Payload,
54 },
55 WorkflowFailed {
57 envelope: EventEnvelope,
59 error: WorkflowError,
61 },
62 WorkflowCancelled {
64 envelope: EventEnvelope,
66 reason: String,
68 },
69 WorkflowTimedOut {
71 envelope: EventEnvelope,
73 timeout: String,
78 },
79 WorkflowContinuedAsNew {
82 envelope: EventEnvelope,
84 input: Payload,
86 workflow_type: Option<String>,
90 parent_run_id: RunId,
92 },
93 SearchAttributesUpdated {
95 envelope: EventEnvelope,
97 workflow_id: WorkflowId,
99 attributes: HashMap<String, SearchAttributeValue>,
101 },
102 ActivityScheduled {
104 envelope: EventEnvelope,
106 activity_id: ActivityId,
108 activity_type: String,
110 input: Payload,
112 },
113 ActivityStarted {
115 envelope: EventEnvelope,
117 activity_id: ActivityId,
119 },
120 ActivityCompleted {
122 envelope: EventEnvelope,
124 activity_id: ActivityId,
126 result: Payload,
128 },
129 ActivityFailed {
135 envelope: EventEnvelope,
137 activity_id: ActivityId,
139 error: ActivityError,
141 attempt: u32,
143 },
144 ActivityCancelled {
146 envelope: EventEnvelope,
148 activity_id: ActivityId,
150 },
151 TimerStarted {
153 envelope: EventEnvelope,
155 timer_id: TimerId,
157 fire_at: DateTime<Utc>,
159 },
160 TimerFired {
162 envelope: EventEnvelope,
164 timer_id: TimerId,
166 },
167 TimerCancelled {
169 envelope: EventEnvelope,
171 timer_id: TimerId,
173 },
174 WithTimeoutCompleted {
176 envelope: EventEnvelope,
178 timer_id: TimerId,
180 outcome: WithTimeoutOutcome,
182 result: Option<Payload>,
184 },
185 SignalReceived {
187 envelope: EventEnvelope,
189 name: String,
191 payload: Payload,
193 },
194 SignalSent {
196 envelope: EventEnvelope,
198 target_workflow_id: WorkflowId,
200 name: String,
202 payload: Payload,
204 },
205 ChildWorkflowStarted {
207 envelope: EventEnvelope,
209 child_workflow_id: WorkflowId,
211 workflow_type: String,
213 input: Payload,
215 },
216 ChildWorkflowCompleted {
218 envelope: EventEnvelope,
220 child_workflow_id: WorkflowId,
222 result: Payload,
224 },
225 ChildWorkflowFailed {
227 envelope: EventEnvelope,
229 child_workflow_id: WorkflowId,
231 error: WorkflowError,
233 },
234 ChildWorkflowCancelled {
236 envelope: EventEnvelope,
238 child_workflow_id: WorkflowId,
240 },
241 ScheduleCreated {
243 envelope: EventEnvelope,
245 schedule_id: ScheduleId,
247 config: ScheduleConfig,
249 },
250 ScheduleUpdated {
252 envelope: EventEnvelope,
254 schedule_id: ScheduleId,
256 config: ScheduleConfig,
258 },
259 SchedulePaused {
261 envelope: EventEnvelope,
263 schedule_id: ScheduleId,
265 },
266 ScheduleResumed {
268 envelope: EventEnvelope,
270 schedule_id: ScheduleId,
272 },
273 ScheduleDeleted {
275 envelope: EventEnvelope,
277 schedule_id: ScheduleId,
279 },
280 ScheduleTriggered {
282 envelope: EventEnvelope,
284 schedule_id: ScheduleId,
286 workflow_id: WorkflowId,
288 run_id: RunId,
290 },
291}
292
293#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
295pub enum WithTimeoutOutcome {
296 OperationCompleted,
298 TimedOut,
300}
301
302impl Event {
303 #[must_use]
305 pub const fn envelope(&self) -> &EventEnvelope {
306 match self {
307 Self::WorkflowStarted { envelope, .. }
308 | Self::WorkflowCompleted { envelope, .. }
309 | Self::WorkflowFailed { envelope, .. }
310 | Self::WorkflowCancelled { envelope, .. }
311 | Self::WorkflowTimedOut { envelope, .. }
312 | Self::WorkflowContinuedAsNew { envelope, .. }
313 | Self::SearchAttributesUpdated { envelope, .. }
314 | Self::ActivityScheduled { envelope, .. }
315 | Self::ActivityStarted { envelope, .. }
316 | Self::ActivityCompleted { envelope, .. }
317 | Self::ActivityFailed { envelope, .. }
318 | Self::ActivityCancelled { envelope, .. }
319 | Self::TimerStarted { envelope, .. }
320 | Self::TimerFired { envelope, .. }
321 | Self::TimerCancelled { envelope, .. }
322 | Self::WithTimeoutCompleted { envelope, .. }
323 | Self::SignalReceived { envelope, .. }
324 | Self::SignalSent { envelope, .. }
325 | Self::ChildWorkflowStarted { envelope, .. }
326 | Self::ChildWorkflowCompleted { envelope, .. }
327 | Self::ChildWorkflowFailed { envelope, .. }
328 | Self::ChildWorkflowCancelled { envelope, .. }
329 | Self::ScheduleCreated { envelope, .. }
330 | Self::ScheduleUpdated { envelope, .. }
331 | Self::SchedulePaused { envelope, .. }
332 | Self::ScheduleResumed { envelope, .. }
333 | Self::ScheduleDeleted { envelope, .. }
334 | Self::ScheduleTriggered { envelope, .. } => envelope,
335 }
336 }
337
338 #[must_use]
340 pub const fn seq(&self) -> u64 {
341 self.envelope().seq
342 }
343
344 #[must_use]
346 pub const fn recorded_at(&self) -> &DateTime<Utc> {
347 &self.envelope().recorded_at
348 }
349
350 #[must_use]
352 pub const fn workflow_id(&self) -> &WorkflowId {
353 &self.envelope().workflow_id
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use std::collections::HashMap;
360
361 use chrono::{DateTime, Utc};
362 use serde_json::json;
363
364 use super::{Event, EventEnvelope};
365 use crate::{
366 ActivityError, ActivityErrorKind, ActivityId, CatchUpPolicy, OverlapPolicy, Payload, RunId,
367 ScheduleConfig, ScheduleId, SearchAttributeValue, TimerId, TriggerSpec, WorkflowError,
368 WorkflowId,
369 };
370
371 fn recorded_at() -> DateTime<Utc> {
372 DateTime::from_timestamp(1_700_000_000, 123_000_000).unwrap_or_default()
373 }
374
375 fn envelope(seq: u64) -> EventEnvelope {
376 EventEnvelope {
377 seq,
378 recorded_at: recorded_at(),
379 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
380 }
381 }
382
383 fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
384 Payload::from_json(&json!({ "label": label }))
385 }
386
387 fn schedule_config(label: &str) -> Result<ScheduleConfig, crate::PayloadError> {
388 Ok(ScheduleConfig {
389 trigger: TriggerSpec::Cron {
390 expression: String::from("0 0 * * *"),
391 },
392 overlap_policy: OverlapPolicy::Skip,
393 catch_up_policy: CatchUpPolicy::One,
394 workflow_type: String::from("checkout"),
395 input: payload(label)?,
396 search_attributes: HashMap::from([(
397 String::from("aion.namespace"),
398 crate::SearchAttributeValue::String(String::from("tenant-a")),
399 )]),
400 })
401 }
402
403 fn workflow_error(message: &str) -> WorkflowError {
404 WorkflowError {
405 message: String::from(message),
406 details: None,
407 }
408 }
409
410 fn activity_error(kind: ActivityErrorKind, message: &str) -> ActivityError {
411 ActivityError {
412 kind,
413 message: String::from(message),
414 details: None,
415 }
416 }
417
418 fn round_trip(event: &Event) -> Result<(), serde_json::Error> {
419 let json = serde_json::to_string(event)?;
420 let decoded = serde_json::from_str::<Event>(&json)?;
421 assert_eq!(*event, decoded);
422 Ok(())
423 }
424
425 #[test]
426 fn event_accessors_return_envelope_fields() -> Result<(), Box<dyn std::error::Error>> {
427 let workflow_id = WorkflowId::new_v4();
428 let recorded_at = recorded_at();
429 let envelope = EventEnvelope {
430 seq: 17,
431 recorded_at,
432 workflow_id: workflow_id.clone(),
433 };
434 let event = Event::WorkflowStarted {
435 envelope,
436 workflow_type: String::from("checkout"),
437 input: payload("input")?,
438 run_id: RunId::new(uuid::Uuid::from_u128(1)),
439 parent_run_id: None,
440 };
441
442 assert_eq!(event.seq(), 17);
443 assert_eq!(event.recorded_at(), &recorded_at);
444 assert_eq!(event.workflow_id(), &workflow_id);
445 Ok(())
446 }
447
448 #[test]
449 fn events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
450 let child_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(1));
451 let fire_at = DateTime::from_timestamp(1_700_000_100, 0).unwrap_or_default();
452 let events = vec![
453 Event::WorkflowStarted {
454 envelope: envelope(1),
455 workflow_type: String::from("checkout"),
456 input: payload("workflow-input")?,
457 run_id: RunId::new(uuid::Uuid::from_u128(1)),
458 parent_run_id: None,
459 },
460 Event::WorkflowCompleted {
461 envelope: envelope(2),
462 result: payload("workflow-result")?,
463 },
464 Event::WorkflowFailed {
465 envelope: envelope(3),
466 error: workflow_error("workflow failed"),
467 },
468 Event::WorkflowCancelled {
469 envelope: envelope(4),
470 reason: String::from("caller requested cancellation"),
471 },
472 Event::WorkflowTimedOut {
473 envelope: envelope(5),
474 timeout: String::from("execution"),
475 },
476 Event::ActivityScheduled {
477 envelope: envelope(6),
478 activity_id: ActivityId::from_sequence_position(6),
479 activity_type: String::from("charge-card"),
480 input: payload("activity-input")?,
481 },
482 Event::ActivityStarted {
483 envelope: envelope(7),
484 activity_id: ActivityId::from_sequence_position(6),
485 },
486 Event::ActivityCompleted {
487 envelope: envelope(8),
488 activity_id: ActivityId::from_sequence_position(6),
489 result: payload("activity-result")?,
490 },
491 Event::ActivityFailed {
492 envelope: envelope(9),
493 activity_id: ActivityId::from_sequence_position(6),
494 error: activity_error(ActivityErrorKind::Retryable, "temporary outage"),
495 attempt: 1,
496 },
497 Event::ActivityCancelled {
498 envelope: envelope(10),
499 activity_id: ActivityId::from_sequence_position(6),
500 },
501 Event::TimerStarted {
502 envelope: envelope(11),
503 timer_id: TimerId::anonymous(11),
504 fire_at,
505 },
506 Event::TimerFired {
507 envelope: envelope(12),
508 timer_id: TimerId::anonymous(11),
509 },
510 Event::TimerCancelled {
511 envelope: envelope(13),
512 timer_id: TimerId::named("reminder")?,
513 },
514 Event::SignalReceived {
515 envelope: envelope(14),
516 name: String::from("approve"),
517 payload: payload("signal")?,
518 },
519 Event::SignalSent {
520 envelope: envelope(15),
521 target_workflow_id: WorkflowId::new(uuid::Uuid::from_u128(5)),
522 name: String::from("approve"),
523 payload: payload("signal-sent")?,
524 },
525 Event::ChildWorkflowStarted {
526 envelope: envelope(16),
527 child_workflow_id: child_workflow_id.clone(),
528 workflow_type: String::from("fulfillment"),
529 input: payload("child-input")?,
530 },
531 Event::ChildWorkflowCompleted {
532 envelope: envelope(16),
533 child_workflow_id: child_workflow_id.clone(),
534 result: payload("child-result")?,
535 },
536 Event::ChildWorkflowFailed {
537 envelope: envelope(17),
538 child_workflow_id: child_workflow_id.clone(),
539 error: workflow_error("child failed"),
540 },
541 Event::ChildWorkflowCancelled {
542 envelope: envelope(18),
543 child_workflow_id,
544 },
545 ];
546
547 for event in events {
548 round_trip(&event)?;
549 }
550 Ok(())
551 }
552
553 #[test]
554 fn extended_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
555 let schedule_id = ScheduleId::new(uuid::Uuid::from_u128(2));
556 let triggered_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(3));
557 let triggered_run_id = RunId::new(uuid::Uuid::from_u128(4));
558 let events = vec![
559 Event::WorkflowContinuedAsNew {
560 envelope: envelope(19),
561 input: payload("continued-input")?,
562 workflow_type: Some(String::from("checkout-v2")),
563 parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
564 },
565 Event::SearchAttributesUpdated {
566 envelope: envelope(20),
567 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
568 attributes: HashMap::from([(
569 String::from("customer_id"),
570 SearchAttributeValue::String(String::from("cust-123")),
571 )]),
572 },
573 Event::ScheduleCreated {
574 envelope: envelope(20),
575 schedule_id: schedule_id.clone(),
576 config: schedule_config("schedule-created")?,
577 },
578 Event::ScheduleUpdated {
579 envelope: envelope(21),
580 schedule_id: schedule_id.clone(),
581 config: schedule_config("schedule-updated")?,
582 },
583 Event::SchedulePaused {
584 envelope: envelope(22),
585 schedule_id: schedule_id.clone(),
586 },
587 Event::ScheduleResumed {
588 envelope: envelope(23),
589 schedule_id: schedule_id.clone(),
590 },
591 Event::ScheduleDeleted {
592 envelope: envelope(24),
593 schedule_id: schedule_id.clone(),
594 },
595 Event::ScheduleTriggered {
596 envelope: envelope(25),
597 schedule_id,
598 workflow_id: triggered_workflow_id,
599 run_id: triggered_run_id,
600 },
601 ];
602
603 for event in events {
604 round_trip(&event)?;
605 }
606 Ok(())
607 }
608}