1use std::collections::{BTreeMap, BTreeSet};
4use std::future::Future;
5use std::time::Duration;
6
7use aion_core::{ActivityError, ActivityId, Payload, WorkflowId};
8use tracing::{debug, error, warn};
9use uuid::Uuid;
10
11use crate::config::WorkerConfig;
12use crate::error::WorkerError;
13use crate::protocol::{GrpcWorkerSession, WorkerSession};
14
15#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum PendingActivityReport {
18 Completed {
20 workflow_id: WorkflowId,
22 activity_id: ActivityId,
24 output: Payload,
26 },
27 Failed {
29 workflow_id: WorkflowId,
31 activity_id: ActivityId,
33 failure: ActivityError,
35 },
36}
37
38impl PendingActivityReport {
39 #[must_use]
41 pub const fn activity_id(&self) -> &ActivityId {
42 match self {
43 Self::Completed { activity_id, .. } | Self::Failed { activity_id, .. } => activity_id,
44 }
45 }
46
47 #[must_use]
49 pub const fn workflow_id(&self) -> &WorkflowId {
50 match self {
51 Self::Completed { workflow_id, .. } | Self::Failed { workflow_id, .. } => workflow_id,
52 }
53 }
54}
55
56type PendingReportKey = (Uuid, u64);
60
61fn pending_report_key(workflow_id: &WorkflowId, activity_id: &ActivityId) -> PendingReportKey {
62 (workflow_id.as_uuid(), activity_id.sequence_position())
63}
64
65#[derive(Clone, Debug, Default, PartialEq, Eq)]
67pub struct UnackedResultTracker {
68 reports: BTreeMap<PendingReportKey, PendingActivityReport>,
69}
70
71impl UnackedResultTracker {
72 #[must_use]
74 pub const fn new() -> Self {
75 Self {
76 reports: BTreeMap::new(),
77 }
78 }
79
80 pub fn record(&mut self, report: PendingActivityReport) {
83 let key = pending_report_key(report.workflow_id(), report.activity_id());
84 self.reports.insert(key, report);
85 }
86
87 pub fn acknowledge(
89 &mut self,
90 workflow_id: &WorkflowId,
91 activity_id: &ActivityId,
92 ) -> Option<PendingActivityReport> {
93 self.reports
94 .remove(&pending_report_key(workflow_id, activity_id))
95 }
96
97 #[must_use]
99 pub fn len(&self) -> usize {
100 self.reports.len()
101 }
102
103 #[must_use]
105 pub fn is_empty(&self) -> bool {
106 self.reports.is_empty()
107 }
108
109 #[must_use]
111 pub fn get(
112 &self,
113 workflow_id: &WorkflowId,
114 activity_id: &ActivityId,
115 ) -> Option<&PendingActivityReport> {
116 self.reports
117 .get(&pending_report_key(workflow_id, activity_id))
118 }
119
120 #[must_use]
122 pub fn snapshot(&self) -> Vec<PendingActivityReport> {
123 self.reports.values().cloned().collect()
124 }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct ReconnectBackoff {
130 initial: Duration,
131 max: Duration,
132 attempts: usize,
133}
134
135impl ReconnectBackoff {
136 pub fn from_config(config: &WorkerConfig) -> Result<Self, WorkerError> {
142 if config.reconnect.initial_backoff.is_zero() {
143 return Err(WorkerError::registration(InvalidReconnectBackoff {
144 message: String::from("reconnect initial_backoff must be greater than zero"),
145 }));
146 }
147 if config.reconnect.max_backoff.is_zero() {
148 return Err(WorkerError::registration(InvalidReconnectBackoff {
149 message: String::from("reconnect max_backoff must be greater than zero"),
150 }));
151 }
152 if config.reconnect.max_attempts == 0 {
153 return Err(WorkerError::registration(InvalidReconnectBackoff {
154 message: String::from("reconnect max_attempts must be greater than zero"),
155 }));
156 }
157 Ok(Self {
158 initial: config.reconnect.initial_backoff,
159 max: config.reconnect.max_backoff,
160 attempts: config.reconnect.max_attempts,
161 })
162 }
163
164 #[must_use]
169 pub fn delay_for_attempt(&self, completed_failures: usize) -> Duration {
170 let bounded_shift = completed_failures.saturating_sub(1).min(31);
171 let shift = u32::try_from(bounded_shift).map_or(31, |shift| shift);
172 let factor = 1_u32.checked_shl(shift).map_or(u32::MAX, |factor| factor);
173 self.initial.saturating_mul(factor).min(self.max)
174 }
175
176 #[must_use]
178 pub const fn attempts(&self) -> usize {
179 self.attempts
180 }
181
182 #[must_use]
190 pub const fn max_delay(&self) -> Duration {
191 self.max
192 }
193}
194
195pub async fn connect_registered_grpc_session(
201 config: &WorkerConfig,
202 activity_types: Vec<String>,
203 available_handlers: &BTreeSet<String>,
204) -> Result<GrpcWorkerSession, WorkerError> {
205 let session = GrpcWorkerSession::connect(config.clone()).await?;
206 register_connected_session(session, config, activity_types, available_handlers).await
207}
208
209pub async fn register_connected_session<S>(
215 mut session: S,
216 config: &WorkerConfig,
217 activity_types: Vec<String>,
218 available_handlers: &BTreeSet<String>,
219) -> Result<S, WorkerError>
220where
221 S: WorkerSession,
222{
223 session.handshake(config).await?;
224 session.register(activity_types, available_handlers).await?;
225 Ok(session)
226}
227
228pub async fn reconnect_with_backoff<S, F, Fut>(
237 config: &WorkerConfig,
238 activity_types: Vec<String>,
239 available_handlers: &BTreeSet<String>,
240 connect: F,
241) -> Result<S, WorkerError>
242where
243 S: WorkerSession,
244 F: FnMut() -> Fut,
245 Fut: Future<Output = Result<S, WorkerError>>,
246{
247 reconnect_with_sleep(
248 config,
249 activity_types,
250 available_handlers,
251 connect,
252 tokio::time::sleep,
253 )
254 .await
255}
256
257pub async fn reconnect_with_sleep<S, F, Fut, Sleep, SleepFut>(
267 config: &WorkerConfig,
268 activity_types: Vec<String>,
269 available_handlers: &BTreeSet<String>,
270 mut connect: F,
271 mut sleep: Sleep,
272) -> Result<S, WorkerError>
273where
274 S: WorkerSession,
275 F: FnMut() -> Fut,
276 Fut: Future<Output = Result<S, WorkerError>>,
277 Sleep: FnMut(Duration) -> SleepFut,
278 SleepFut: Future<Output = ()>,
279{
280 let backoff = ReconnectBackoff::from_config(config)?;
281
282 for attempt in 1..=backoff.attempts() {
283 debug!(attempt, "attempting worker reconnect");
284 let result = match connect().await {
285 Ok(session) => {
286 register_connected_session(
287 session,
288 config,
289 activity_types.clone(),
290 available_handlers,
291 )
292 .await
293 }
294 Err(error) => Err(error),
295 };
296
297 match result {
298 Ok(session) => {
299 debug!(attempt, "worker reconnect succeeded");
300 return Ok(session);
301 }
302 Err(error) => {
303 if !error.is_retryable() {
304 error!(
305 attempt,
306 error = %error,
307 "worker reconnect denied by server; not retrying"
308 );
309 return Err(error);
310 }
311 if attempt == backoff.attempts() {
312 error!(attempt, error = %error, "worker reconnect attempts exhausted");
313 return Err(error);
314 }
315 let delay = backoff.delay_for_attempt(attempt);
316 warn!(
317 attempt,
318 delay_ms = delay.as_millis(),
319 error = %error,
320 "worker reconnect failed; backing off"
321 );
322 sleep(delay).await;
323 }
324 }
325 }
326
327 Err(WorkerError::registration(InvalidReconnectBackoff {
328 message: String::from("reconnect_max_attempts must be greater than zero"),
329 }))
330}
331
332pub async fn re_report_unacked<S>(
343 tracker: &UnackedResultTracker,
344 session: &mut S,
345) -> Result<(), WorkerError>
346where
347 S: WorkerSession,
348{
349 for report in tracker.snapshot() {
350 match report {
351 PendingActivityReport::Completed {
352 workflow_id,
353 activity_id,
354 output,
355 } => {
356 debug!(
357 workflow_id = %workflow_id,
358 activity_id = activity_id.sequence_position(),
359 "re-reporting unacknowledged activity result"
360 );
361 session
362 .report_result(workflow_id, activity_id, output)
363 .await?;
364 }
365 PendingActivityReport::Failed {
366 workflow_id,
367 activity_id,
368 failure,
369 } => {
370 debug!(
371 workflow_id = %workflow_id,
372 activity_id = activity_id.sequence_position(),
373 "re-reporting unacknowledged activity failure"
374 );
375 session
376 .report_failure(workflow_id, activity_id, failure)
377 .await?;
378 }
379 }
380 }
381 Ok(())
382}
383
384#[derive(Debug, thiserror::Error)]
385#[error("{message}")]
386struct InvalidReconnectBackoff {
387 message: String,
388}
389
390#[cfg(test)]
391mod tests {
392 use std::cell::RefCell;
393 use std::collections::BTreeSet;
394 use std::rc::Rc;
395 use std::time::Duration;
396
397 use aion_core::{
398 ActivityError, ActivityErrorKind, ActivityId, ContentType, Payload, WorkflowId,
399 };
400 use async_trait::async_trait;
401 use futures::stream;
402
403 use super::{
404 PendingActivityReport, UnackedResultTracker, re_report_unacked, reconnect_with_sleep,
405 };
406 use crate::error::WorkerError;
407 use crate::protocol::{
408 WorkerSession, WorkerSessionEvent, WorkerTaskStream, validate_activity_handlers,
409 };
410 use crate::{ReconnectConfig, WorkerConfig};
411
412 #[test]
413 fn tracker_records_reports_and_acknowledges_by_workflow_and_activity_id() {
414 let workflow_id = WorkflowId::new_v4();
415 let first_id = ActivityId::from_sequence_position(1);
416 let second_id = ActivityId::from_sequence_position(2);
417 let mut tracker = UnackedResultTracker::new();
418
419 tracker.record(PendingActivityReport::Completed {
420 workflow_id: workflow_id.clone(),
421 activity_id: first_id.clone(),
422 output: Payload::new(ContentType::Json, b"{\"first\":true}".to_vec()),
423 });
424 tracker.record(PendingActivityReport::Completed {
425 workflow_id: workflow_id.clone(),
426 activity_id: second_id.clone(),
427 output: Payload::new(ContentType::Json, b"{\"second\":true}".to_vec()),
428 });
429
430 assert_eq!(tracker.len(), 2);
431 assert!(tracker.acknowledge(&workflow_id, &first_id).is_some());
432 assert_eq!(tracker.len(), 1);
433 assert!(tracker.get(&workflow_id, &second_id).is_some());
434 assert!(tracker.get(&workflow_id, &first_id).is_none());
435 }
436
437 #[test]
438 fn tracker_keeps_reports_for_distinct_workflows_at_the_same_sequence_position() {
439 let first_workflow = WorkflowId::new_v4();
440 let second_workflow = WorkflowId::new_v4();
441 let activity_id = ActivityId::from_sequence_position(3);
442 let mut tracker = UnackedResultTracker::new();
443
444 tracker.record(PendingActivityReport::Completed {
445 workflow_id: first_workflow.clone(),
446 activity_id: activity_id.clone(),
447 output: Payload::new(ContentType::Json, b"{\"workflow\":\"a\"}".to_vec()),
448 });
449 tracker.record(PendingActivityReport::Completed {
450 workflow_id: second_workflow.clone(),
451 activity_id: activity_id.clone(),
452 output: Payload::new(ContentType::Json, b"{\"workflow\":\"b\"}".to_vec()),
453 });
454
455 assert_eq!(tracker.len(), 2);
456 assert!(tracker.get(&first_workflow, &activity_id).is_some());
457 assert!(tracker.get(&second_workflow, &activity_id).is_some());
458 assert!(
459 tracker.acknowledge(&first_workflow, &activity_id).is_some(),
460 "acknowledging one workflow's report must not require the other's"
461 );
462 assert_eq!(tracker.len(), 1);
463 assert!(tracker.get(&second_workflow, &activity_id).is_some());
464 }
465
466 #[tokio::test]
467 async fn reconnect_fails_once_then_handshakes_and_registers() -> Result<(), WorkerError> {
468 let config = test_config();
469 let attempts = Rc::new(RefCell::new(0usize));
470 let sleeps = Rc::new(RefCell::new(Vec::new()));
471 let activity_types = vec![String::from("charge-card")];
472 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
473 let attempts_for_connect = Rc::clone(&attempts);
474 let sleeps_for_sleep = Rc::clone(&sleeps);
475
476 let session = reconnect_with_sleep(
477 &config,
478 activity_types.clone(),
479 &handlers,
480 move || {
481 let attempts_for_connect = Rc::clone(&attempts_for_connect);
482 async move {
483 let mut attempts = attempts_for_connect.borrow_mut();
484 *attempts += 1;
485 if *attempts == 1 {
486 Err(WorkerError::Transport {
487 source: tonic::Status::unavailable("disconnected"),
488 })
489 } else {
490 Ok(ReconnectFakeSession::default())
491 }
492 }
493 },
494 move |delay| {
495 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
496 async move {
497 sleeps_for_sleep.borrow_mut().push(delay);
498 }
499 },
500 )
501 .await?;
502
503 assert_eq!(*attempts.borrow(), 2);
504 assert_eq!(*sleeps.borrow(), vec![Duration::from_millis(5)]);
505 assert_eq!(session.handshakes, vec![String::from("worker-a")]);
506 assert_eq!(session.registrations, vec![activity_types]);
507 Ok(())
508 }
509
510 #[tokio::test]
511 async fn permission_denied_registration_stops_after_one_attempt() {
512 let config = test_config();
513 let attempts = Rc::new(RefCell::new(0usize));
514 let sleeps = Rc::new(RefCell::new(Vec::new()));
515 let activity_types = vec![String::from("charge-card")];
516 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
517 let attempts_for_connect = Rc::clone(&attempts);
518 let sleeps_for_sleep = Rc::clone(&sleeps);
519
520 let result = reconnect_with_sleep(
521 &config,
522 activity_types,
523 &handlers,
524 move || {
525 let attempts_for_connect = Rc::clone(&attempts_for_connect);
526 async move {
527 *attempts_for_connect.borrow_mut() += 1;
528 Ok(DeniedRegistrationSession {
529 denial: tonic::Status::permission_denied(
530 "namespace `payments` is not granted to subject `worker-a`",
531 ),
532 })
533 }
534 },
535 move |delay| {
536 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
537 async move {
538 sleeps_for_sleep.borrow_mut().push(delay);
539 }
540 },
541 )
542 .await;
543
544 assert!(result.is_err());
545 let Err(error) = result else { return };
546 assert_eq!(*attempts.borrow(), 1);
547 assert!(sleeps.borrow().is_empty());
548 assert!(!error.is_retryable());
549 assert!(matches!(
550 error.grpc_status().map(tonic::Status::code),
551 Some(tonic::Code::PermissionDenied)
552 ));
553 assert_eq!(
554 error.grpc_status().map(tonic::Status::message),
555 Some("namespace `payments` is not granted to subject `worker-a`")
556 );
557 assert!(
558 error
559 .to_string()
560 .contains("namespace `payments` is not granted to subject `worker-a`")
561 );
562 }
563
564 #[tokio::test]
565 async fn unauthenticated_handshake_stops_after_one_attempt() {
566 let config = test_config();
567 let attempts = Rc::new(RefCell::new(0usize));
568 let sleeps = Rc::new(RefCell::new(Vec::new()));
569 let activity_types = vec![String::from("charge-card")];
570 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
571 let attempts_for_connect = Rc::clone(&attempts);
572 let sleeps_for_sleep = Rc::clone(&sleeps);
573
574 let result = reconnect_with_sleep(
575 &config,
576 activity_types,
577 &handlers,
578 move || {
579 let attempts_for_connect = Rc::clone(&attempts_for_connect);
580 async move {
581 *attempts_for_connect.borrow_mut() += 1;
582 Err::<ReconnectFakeSession, _>(WorkerError::Handshake {
583 source: tonic::Status::unauthenticated("worker credentials were rejected"),
584 })
585 }
586 },
587 move |delay| {
588 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
589 async move {
590 sleeps_for_sleep.borrow_mut().push(delay);
591 }
592 },
593 )
594 .await;
595
596 assert!(result.is_err());
597 let Err(error) = result else { return };
598 assert_eq!(*attempts.borrow(), 1);
599 assert!(sleeps.borrow().is_empty());
600 assert!(!error.is_retryable());
601 assert!(matches!(
602 error.grpc_status().map(tonic::Status::code),
603 Some(tonic::Code::Unauthenticated)
604 ));
605 assert!(
606 error
607 .to_string()
608 .contains("worker credentials were rejected")
609 );
610 }
611
612 #[tokio::test]
613 async fn unavailable_transport_retries_until_attempts_exhausted() {
614 let config = test_config();
615 let attempts = Rc::new(RefCell::new(0usize));
616 let sleeps = Rc::new(RefCell::new(Vec::new()));
617 let activity_types = vec![String::from("charge-card")];
618 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
619 let attempts_for_connect = Rc::clone(&attempts);
620 let sleeps_for_sleep = Rc::clone(&sleeps);
621
622 let result = reconnect_with_sleep(
623 &config,
624 activity_types,
625 &handlers,
626 move || {
627 let attempts_for_connect = Rc::clone(&attempts_for_connect);
628 async move {
629 *attempts_for_connect.borrow_mut() += 1;
630 Err::<ReconnectFakeSession, _>(WorkerError::Transport {
631 source: tonic::Status::unavailable("engine unreachable"),
632 })
633 }
634 },
635 move |delay| {
636 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
637 async move {
638 sleeps_for_sleep.borrow_mut().push(delay);
639 }
640 },
641 )
642 .await;
643
644 assert!(result.is_err());
645 let Err(error) = result else { return };
646 assert_eq!(*attempts.borrow(), 3);
647 assert_eq!(
648 *sleeps.borrow(),
649 vec![Duration::from_millis(5), Duration::from_millis(10)]
650 );
651 assert!(error.is_retryable());
652 assert!(matches!(
653 error.grpc_status().map(tonic::Status::code),
654 Some(tonic::Code::Unavailable)
655 ));
656 }
657
658 #[tokio::test]
659 async fn re_reports_unacked_reports_without_removing_them() -> Result<(), WorkerError> {
660 let workflow_id = WorkflowId::new_v4();
661 let activity_id = ActivityId::from_sequence_position(7);
662 let output = Payload::new(ContentType::Json, b"{}".to_vec());
663 let mut tracker = UnackedResultTracker::new();
664 tracker.record(PendingActivityReport::Completed {
665 workflow_id: workflow_id.clone(),
666 activity_id: activity_id.clone(),
667 output: output.clone(),
668 });
669 let mut session = ReconnectFakeSession::default();
670
671 re_report_unacked(&tracker, &mut session).await?;
672
673 assert_eq!(tracker.len(), 1);
674 assert_eq!(
675 session.reports,
676 vec![RecordedReport::Completed(workflow_id, activity_id, output)]
677 );
678 Ok(())
679 }
680
681 #[derive(Default)]
682 struct ReconnectFakeSession {
683 handshakes: Vec<String>,
684 registrations: Vec<Vec<String>>,
685 reports: Vec<RecordedReport>,
686 }
687
688 struct DeniedRegistrationSession {
692 denial: tonic::Status,
693 }
694
695 #[async_trait]
696 impl WorkerSession for DeniedRegistrationSession {
697 async fn handshake(&mut self, _config: &WorkerConfig) -> Result<(), WorkerError> {
698 Ok(())
699 }
700
701 async fn register(
702 &mut self,
703 activity_types: Vec<String>,
704 available_handlers: &BTreeSet<String>,
705 ) -> Result<(), WorkerError> {
706 validate_activity_handlers(&activity_types, available_handlers)?;
707 Err(WorkerError::Registration {
708 source: Box::new(self.denial.clone()),
709 })
710 }
711
712 fn receive_tasks(&mut self) -> WorkerTaskStream {
713 Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
714 }
715
716 async fn report_result(
717 &mut self,
718 workflow_id: WorkflowId,
719 activity_id: ActivityId,
720 result: Payload,
721 ) -> Result<(), WorkerError> {
722 drop((workflow_id, activity_id, result));
723 Err(WorkerError::Registration {
724 source: Box::new(self.denial.clone()),
725 })
726 }
727
728 async fn report_failure(
729 &mut self,
730 workflow_id: WorkflowId,
731 activity_id: ActivityId,
732 failure: ActivityError,
733 ) -> Result<(), WorkerError> {
734 drop((workflow_id, activity_id, failure));
735 Err(WorkerError::Registration {
736 source: Box::new(self.denial.clone()),
737 })
738 }
739
740 async fn send_heartbeat(
741 &mut self,
742 workflow_id: WorkflowId,
743 activity_id: ActivityId,
744 progress: Option<Payload>,
745 ) -> Result<(), WorkerError> {
746 drop((workflow_id, activity_id, progress));
747 Err(WorkerError::Registration {
748 source: Box::new(self.denial.clone()),
749 })
750 }
751 }
752
753 #[derive(Clone, Debug, PartialEq, Eq)]
754 enum RecordedReport {
755 Completed(WorkflowId, ActivityId, Payload),
756 Failed(WorkflowId, ActivityId, ActivityError),
757 }
758
759 #[async_trait]
760 impl WorkerSession for ReconnectFakeSession {
761 async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
762 self.handshakes.push(config.identity.clone());
763 Ok(())
764 }
765
766 async fn register(
767 &mut self,
768 activity_types: Vec<String>,
769 available_handlers: &BTreeSet<String>,
770 ) -> Result<(), WorkerError> {
771 validate_activity_handlers(&activity_types, available_handlers)?;
772 self.registrations.push(activity_types);
773 Ok(())
774 }
775
776 fn receive_tasks(&mut self) -> WorkerTaskStream {
777 Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
778 }
779
780 async fn report_result(
781 &mut self,
782 workflow_id: WorkflowId,
783 activity_id: ActivityId,
784 result: Payload,
785 ) -> Result<(), WorkerError> {
786 self.reports
787 .push(RecordedReport::Completed(workflow_id, activity_id, result));
788 Ok(())
789 }
790
791 async fn report_failure(
792 &mut self,
793 workflow_id: WorkflowId,
794 activity_id: ActivityId,
795 failure: ActivityError,
796 ) -> Result<(), WorkerError> {
797 self.reports
798 .push(RecordedReport::Failed(workflow_id, activity_id, failure));
799 Ok(())
800 }
801
802 async fn send_heartbeat(
803 &mut self,
804 workflow_id: WorkflowId,
805 activity_id: ActivityId,
806 progress: Option<Payload>,
807 ) -> Result<(), WorkerError> {
808 drop((workflow_id, activity_id, progress));
809 Ok(())
810 }
811 }
812
813 fn test_config() -> WorkerConfig {
814 WorkerConfig::new(
815 "http://127.0.0.1:50051",
816 "payments",
817 "worker-a",
818 2,
819 ReconnectConfig::new(Duration::from_millis(5), Duration::from_millis(20), 3),
820 None,
821 )
822 }
823
824 fn terminal_failure() -> ActivityError {
825 ActivityError {
826 kind: ActivityErrorKind::Terminal,
827 message: String::from("terminal"),
828 details: None,
829 }
830 }
831
832 #[test]
833 fn tracker_replaces_existing_activity_report() {
834 let workflow_id = WorkflowId::new_v4();
835 let activity_id = ActivityId::from_sequence_position(9);
836 let mut tracker = UnackedResultTracker::new();
837 tracker.record(PendingActivityReport::Completed {
838 workflow_id: workflow_id.clone(),
839 activity_id: activity_id.clone(),
840 output: Payload::new(ContentType::Json, b"{}".to_vec()),
841 });
842 tracker.record(PendingActivityReport::Failed {
843 workflow_id: workflow_id.clone(),
844 activity_id: activity_id.clone(),
845 failure: terminal_failure(),
846 });
847
848 assert_eq!(tracker.len(), 1);
849 assert!(matches!(
850 tracker.get(&workflow_id, &activity_id),
851 Some(PendingActivityReport::Failed { .. })
852 ));
853 }
854}