1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::{
9 ActivityError, ActivityId, PackageVersion, Payload, RunId, ScheduleConfig, ScheduleId,
10 SearchAttributeValue, TimerId, WorkflowError, WorkflowId,
11};
12
13#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
15pub struct EventEnvelope {
16 pub seq: u64,
18 pub recorded_at: DateTime<Utc>,
23 pub workflow_id: WorkflowId,
25}
26
27#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq)]
32#[serde(tag = "type", content = "data")]
33pub enum Event {
34 WorkflowStarted {
36 envelope: EventEnvelope,
38 workflow_type: String,
40 input: Payload,
42 run_id: RunId,
44 parent_run_id: Option<RunId>,
47 package_version: PackageVersion,
52 },
53 WorkflowCompleted {
55 envelope: EventEnvelope,
57 result: Payload,
59 },
60 WorkflowFailed {
62 envelope: EventEnvelope,
64 error: WorkflowError,
66 },
67 WorkflowCancelled {
69 envelope: EventEnvelope,
71 reason: String,
73 },
74 WorkflowTimedOut {
76 envelope: EventEnvelope,
78 timeout: String,
83 },
84 WorkflowContinuedAsNew {
87 envelope: EventEnvelope,
89 input: Payload,
91 workflow_type: Option<String>,
95 parent_run_id: RunId,
97 },
98 SearchAttributesUpdated {
100 envelope: EventEnvelope,
102 workflow_id: WorkflowId,
104 attributes: HashMap<String, SearchAttributeValue>,
106 },
107 ActivityScheduled {
109 envelope: EventEnvelope,
111 activity_id: ActivityId,
113 activity_type: String,
115 input: Payload,
117 },
118 ActivityStarted {
120 envelope: EventEnvelope,
122 activity_id: ActivityId,
124 },
125 ActivityCompleted {
127 envelope: EventEnvelope,
129 activity_id: ActivityId,
131 result: Payload,
133 },
134 ActivityFailed {
140 envelope: EventEnvelope,
142 activity_id: ActivityId,
144 error: ActivityError,
146 attempt: u32,
148 },
149 ActivityCancelled {
151 envelope: EventEnvelope,
153 activity_id: ActivityId,
155 },
156 TimerStarted {
158 envelope: EventEnvelope,
160 timer_id: TimerId,
162 fire_at: DateTime<Utc>,
164 },
165 TimerFired {
167 envelope: EventEnvelope,
169 timer_id: TimerId,
171 },
172 TimerCancelled {
174 envelope: EventEnvelope,
176 timer_id: TimerId,
178 },
179 WithTimeoutCompleted {
181 envelope: EventEnvelope,
183 timer_id: TimerId,
185 outcome: WithTimeoutOutcome,
187 result: Option<Payload>,
189 },
190 SignalReceived {
192 envelope: EventEnvelope,
194 name: String,
196 payload: Payload,
198 },
199 SignalSent {
201 envelope: EventEnvelope,
203 target_workflow_id: WorkflowId,
205 name: String,
207 payload: Payload,
209 },
210 ChildWorkflowStarted {
212 envelope: EventEnvelope,
214 child_workflow_id: WorkflowId,
216 workflow_type: String,
218 input: Payload,
220 package_version: PackageVersion,
226 },
227 ChildWorkflowCompleted {
229 envelope: EventEnvelope,
231 child_workflow_id: WorkflowId,
233 result: Payload,
235 },
236 ChildWorkflowFailed {
238 envelope: EventEnvelope,
240 child_workflow_id: WorkflowId,
242 error: WorkflowError,
244 },
245 ChildWorkflowCancelled {
247 envelope: EventEnvelope,
249 child_workflow_id: WorkflowId,
251 },
252 ScheduleCreated {
254 envelope: EventEnvelope,
256 schedule_id: ScheduleId,
258 config: ScheduleConfig,
260 },
261 ScheduleUpdated {
263 envelope: EventEnvelope,
265 schedule_id: ScheduleId,
267 config: ScheduleConfig,
269 },
270 SchedulePaused {
272 envelope: EventEnvelope,
274 schedule_id: ScheduleId,
276 },
277 ScheduleResumed {
279 envelope: EventEnvelope,
281 schedule_id: ScheduleId,
283 },
284 ScheduleDeleted {
286 envelope: EventEnvelope,
288 schedule_id: ScheduleId,
290 },
291 ScheduleTriggered {
293 envelope: EventEnvelope,
295 schedule_id: ScheduleId,
297 workflow_id: WorkflowId,
299 run_id: RunId,
301 },
302}
303
304#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
306pub enum WithTimeoutOutcome {
307 OperationCompleted,
309 TimedOut,
311}
312
313impl Event {
314 #[must_use]
316 pub const fn envelope(&self) -> &EventEnvelope {
317 match self {
318 Self::WorkflowStarted { envelope, .. }
319 | Self::WorkflowCompleted { envelope, .. }
320 | Self::WorkflowFailed { envelope, .. }
321 | Self::WorkflowCancelled { envelope, .. }
322 | Self::WorkflowTimedOut { envelope, .. }
323 | Self::WorkflowContinuedAsNew { envelope, .. }
324 | Self::SearchAttributesUpdated { envelope, .. }
325 | Self::ActivityScheduled { envelope, .. }
326 | Self::ActivityStarted { envelope, .. }
327 | Self::ActivityCompleted { envelope, .. }
328 | Self::ActivityFailed { envelope, .. }
329 | Self::ActivityCancelled { envelope, .. }
330 | Self::TimerStarted { envelope, .. }
331 | Self::TimerFired { envelope, .. }
332 | Self::TimerCancelled { envelope, .. }
333 | Self::WithTimeoutCompleted { envelope, .. }
334 | Self::SignalReceived { envelope, .. }
335 | Self::SignalSent { envelope, .. }
336 | Self::ChildWorkflowStarted { envelope, .. }
337 | Self::ChildWorkflowCompleted { envelope, .. }
338 | Self::ChildWorkflowFailed { envelope, .. }
339 | Self::ChildWorkflowCancelled { envelope, .. }
340 | Self::ScheduleCreated { envelope, .. }
341 | Self::ScheduleUpdated { envelope, .. }
342 | Self::SchedulePaused { envelope, .. }
343 | Self::ScheduleResumed { envelope, .. }
344 | Self::ScheduleDeleted { envelope, .. }
345 | Self::ScheduleTriggered { envelope, .. } => envelope,
346 }
347 }
348
349 #[must_use]
351 pub const fn seq(&self) -> u64 {
352 self.envelope().seq
353 }
354
355 #[must_use]
357 pub const fn recorded_at(&self) -> &DateTime<Utc> {
358 &self.envelope().recorded_at
359 }
360
361 #[must_use]
363 pub const fn workflow_id(&self) -> &WorkflowId {
364 &self.envelope().workflow_id
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use std::collections::HashMap;
371
372 use chrono::{DateTime, Utc};
373 use serde_json::json;
374
375 use super::{Event, EventEnvelope};
376 use crate::{
377 ActivityError, ActivityErrorKind, ActivityId, CatchUpPolicy, OverlapPolicy, PackageVersion,
378 Payload, RunId, ScheduleConfig, ScheduleId, SearchAttributeValue, TimerId, TriggerSpec,
379 WorkflowError, WorkflowId,
380 };
381
382 fn package_version() -> PackageVersion {
383 PackageVersion::new("a".repeat(64))
384 }
385
386 fn recorded_at() -> DateTime<Utc> {
387 DateTime::from_timestamp(1_700_000_000, 123_000_000).unwrap_or_default()
388 }
389
390 fn envelope(seq: u64) -> EventEnvelope {
391 EventEnvelope {
392 seq,
393 recorded_at: recorded_at(),
394 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
395 }
396 }
397
398 fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
399 Payload::from_json(&json!({ "label": label }))
400 }
401
402 fn schedule_config(label: &str) -> Result<ScheduleConfig, crate::PayloadError> {
403 Ok(ScheduleConfig {
404 trigger: TriggerSpec::Cron {
405 expression: String::from("0 0 * * *"),
406 },
407 overlap_policy: OverlapPolicy::Skip,
408 catch_up_policy: CatchUpPolicy::One,
409 workflow_type: String::from("checkout"),
410 input: payload(label)?,
411 search_attributes: HashMap::from([(
412 String::from("aion.namespace"),
413 crate::SearchAttributeValue::String(String::from("tenant-a")),
414 )]),
415 })
416 }
417
418 fn workflow_error(message: &str) -> WorkflowError {
419 WorkflowError {
420 message: String::from(message),
421 details: None,
422 }
423 }
424
425 fn activity_error(kind: ActivityErrorKind, message: &str) -> ActivityError {
426 ActivityError {
427 kind,
428 message: String::from(message),
429 details: None,
430 }
431 }
432
433 fn round_trip(event: &Event) -> Result<(), serde_json::Error> {
434 let json = serde_json::to_string(event)?;
435 let decoded = serde_json::from_str::<Event>(&json)?;
436 assert_eq!(*event, decoded);
437 Ok(())
438 }
439
440 #[test]
441 fn event_accessors_return_envelope_fields() -> Result<(), Box<dyn std::error::Error>> {
442 let workflow_id = WorkflowId::new_v4();
443 let recorded_at = recorded_at();
444 let envelope = EventEnvelope {
445 seq: 17,
446 recorded_at,
447 workflow_id: workflow_id.clone(),
448 };
449 let event = Event::WorkflowStarted {
450 envelope,
451 workflow_type: String::from("checkout"),
452 input: payload("input")?,
453 run_id: RunId::new(uuid::Uuid::from_u128(1)),
454 parent_run_id: None,
455 package_version: package_version(),
456 };
457
458 assert_eq!(event.seq(), 17);
459 assert_eq!(event.recorded_at(), &recorded_at);
460 assert_eq!(event.workflow_id(), &workflow_id);
461 Ok(())
462 }
463
464 #[test]
465 fn events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
466 let fire_at = DateTime::from_timestamp(1_700_000_100, 0).unwrap_or_default();
467 let events = vec![
468 Event::WorkflowStarted {
469 envelope: envelope(1),
470 workflow_type: String::from("checkout"),
471 input: payload("workflow-input")?,
472 run_id: RunId::new(uuid::Uuid::from_u128(1)),
473 parent_run_id: None,
474 package_version: package_version(),
475 },
476 Event::WorkflowCompleted {
477 envelope: envelope(2),
478 result: payload("workflow-result")?,
479 },
480 Event::WorkflowFailed {
481 envelope: envelope(3),
482 error: workflow_error("workflow failed"),
483 },
484 Event::WorkflowCancelled {
485 envelope: envelope(4),
486 reason: String::from("caller requested cancellation"),
487 },
488 Event::WorkflowTimedOut {
489 envelope: envelope(5),
490 timeout: String::from("execution"),
491 },
492 Event::ActivityScheduled {
493 envelope: envelope(6),
494 activity_id: ActivityId::from_sequence_position(6),
495 activity_type: String::from("charge-card"),
496 input: payload("activity-input")?,
497 },
498 Event::ActivityStarted {
499 envelope: envelope(7),
500 activity_id: ActivityId::from_sequence_position(6),
501 },
502 Event::ActivityCompleted {
503 envelope: envelope(8),
504 activity_id: ActivityId::from_sequence_position(6),
505 result: payload("activity-result")?,
506 },
507 Event::ActivityFailed {
508 envelope: envelope(9),
509 activity_id: ActivityId::from_sequence_position(6),
510 error: activity_error(ActivityErrorKind::Retryable, "temporary outage"),
511 attempt: 1,
512 },
513 Event::ActivityCancelled {
514 envelope: envelope(10),
515 activity_id: ActivityId::from_sequence_position(6),
516 },
517 Event::TimerStarted {
518 envelope: envelope(11),
519 timer_id: TimerId::anonymous(11),
520 fire_at,
521 },
522 Event::TimerFired {
523 envelope: envelope(12),
524 timer_id: TimerId::anonymous(11),
525 },
526 Event::TimerCancelled {
527 envelope: envelope(13),
528 timer_id: TimerId::named("reminder")?,
529 },
530 Event::SignalReceived {
531 envelope: envelope(14),
532 name: String::from("approve"),
533 payload: payload("signal")?,
534 },
535 Event::SignalSent {
536 envelope: envelope(15),
537 target_workflow_id: WorkflowId::new(uuid::Uuid::from_u128(5)),
538 name: String::from("approve"),
539 payload: payload("signal-sent")?,
540 },
541 ];
542
543 for event in events {
544 round_trip(&event)?;
545 }
546 Ok(())
547 }
548
549 #[test]
550 fn child_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
551 let child_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(1));
552 let events = vec![
553 Event::ChildWorkflowStarted {
554 envelope: envelope(16),
555 child_workflow_id: child_workflow_id.clone(),
556 workflow_type: String::from("fulfillment"),
557 input: payload("child-input")?,
558 package_version: package_version(),
559 },
560 Event::ChildWorkflowCompleted {
561 envelope: envelope(16),
562 child_workflow_id: child_workflow_id.clone(),
563 result: payload("child-result")?,
564 },
565 Event::ChildWorkflowFailed {
566 envelope: envelope(17),
567 child_workflow_id: child_workflow_id.clone(),
568 error: workflow_error("child failed"),
569 },
570 Event::ChildWorkflowCancelled {
571 envelope: envelope(18),
572 child_workflow_id,
573 },
574 ];
575
576 for event in events {
577 round_trip(&event)?;
578 }
579 Ok(())
580 }
581
582 #[test]
583 fn extended_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
584 let schedule_id = ScheduleId::new(uuid::Uuid::from_u128(2));
585 let triggered_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(3));
586 let triggered_run_id = RunId::new(uuid::Uuid::from_u128(4));
587 let events = vec![
588 Event::WorkflowContinuedAsNew {
589 envelope: envelope(19),
590 input: payload("continued-input")?,
591 workflow_type: Some(String::from("checkout-v2")),
592 parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
593 },
594 Event::SearchAttributesUpdated {
595 envelope: envelope(20),
596 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
597 attributes: HashMap::from([(
598 String::from("customer_id"),
599 SearchAttributeValue::String(String::from("cust-123")),
600 )]),
601 },
602 Event::ScheduleCreated {
603 envelope: envelope(20),
604 schedule_id: schedule_id.clone(),
605 config: schedule_config("schedule-created")?,
606 },
607 Event::ScheduleUpdated {
608 envelope: envelope(21),
609 schedule_id: schedule_id.clone(),
610 config: schedule_config("schedule-updated")?,
611 },
612 Event::SchedulePaused {
613 envelope: envelope(22),
614 schedule_id: schedule_id.clone(),
615 },
616 Event::ScheduleResumed {
617 envelope: envelope(23),
618 schedule_id: schedule_id.clone(),
619 },
620 Event::ScheduleDeleted {
621 envelope: envelope(24),
622 schedule_id: schedule_id.clone(),
623 },
624 Event::ScheduleTriggered {
625 envelope: envelope(25),
626 schedule_id,
627 workflow_id: triggered_workflow_id,
628 run_id: triggered_run_id,
629 },
630 ];
631
632 for event in events {
633 round_trip(&event)?;
634 }
635 Ok(())
636 }
637}