1use std::collections::{BTreeMap, BTreeSet};
4use std::future::Future;
5use std::time::Duration;
6
7use aion_core::{ActivityError, ActivityId, Payload, WorkflowId};
8use tracing::{debug, error, warn};
9use uuid::Uuid;
10
11use crate::config::WorkerConfig;
12use crate::error::WorkerError;
13use crate::protocol::{GrpcWorkerSession, WorkerSession};
14
15#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum PendingActivityReport {
18 Completed {
20 workflow_id: WorkflowId,
22 activity_id: ActivityId,
24 output: Payload,
26 },
27 Failed {
29 workflow_id: WorkflowId,
31 activity_id: ActivityId,
33 failure: ActivityError,
35 },
36}
37
38impl PendingActivityReport {
39 #[must_use]
41 pub const fn activity_id(&self) -> &ActivityId {
42 match self {
43 Self::Completed { activity_id, .. } | Self::Failed { activity_id, .. } => activity_id,
44 }
45 }
46
47 #[must_use]
49 pub const fn workflow_id(&self) -> &WorkflowId {
50 match self {
51 Self::Completed { workflow_id, .. } | Self::Failed { workflow_id, .. } => workflow_id,
52 }
53 }
54}
55
56type PendingReportKey = (Uuid, u64);
60
61fn pending_report_key(workflow_id: &WorkflowId, activity_id: &ActivityId) -> PendingReportKey {
62 (workflow_id.as_uuid(), activity_id.sequence_position())
63}
64
65#[derive(Clone, Debug, Default, PartialEq, Eq)]
67pub struct UnackedResultTracker {
68 reports: BTreeMap<PendingReportKey, PendingActivityReport>,
69}
70
71impl UnackedResultTracker {
72 #[must_use]
74 pub const fn new() -> Self {
75 Self {
76 reports: BTreeMap::new(),
77 }
78 }
79
80 pub fn record(&mut self, report: PendingActivityReport) {
83 let key = pending_report_key(report.workflow_id(), report.activity_id());
84 self.reports.insert(key, report);
85 }
86
87 pub fn acknowledge(
89 &mut self,
90 workflow_id: &WorkflowId,
91 activity_id: &ActivityId,
92 ) -> Option<PendingActivityReport> {
93 self.reports
94 .remove(&pending_report_key(workflow_id, activity_id))
95 }
96
97 #[must_use]
99 pub fn len(&self) -> usize {
100 self.reports.len()
101 }
102
103 #[must_use]
105 pub fn is_empty(&self) -> bool {
106 self.reports.is_empty()
107 }
108
109 #[must_use]
111 pub fn get(
112 &self,
113 workflow_id: &WorkflowId,
114 activity_id: &ActivityId,
115 ) -> Option<&PendingActivityReport> {
116 self.reports
117 .get(&pending_report_key(workflow_id, activity_id))
118 }
119
120 #[must_use]
122 pub fn snapshot(&self) -> Vec<PendingActivityReport> {
123 self.reports.values().cloned().collect()
124 }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct ReconnectBackoff {
130 initial: Duration,
131 max: Duration,
132 attempts: usize,
133}
134
135impl ReconnectBackoff {
136 pub fn from_config(config: &WorkerConfig) -> Result<Self, WorkerError> {
142 if config.reconnect.initial_backoff.is_zero() {
143 return Err(WorkerError::registration(InvalidReconnectBackoff {
144 message: String::from("reconnect initial_backoff must be greater than zero"),
145 }));
146 }
147 if config.reconnect.max_backoff.is_zero() {
148 return Err(WorkerError::registration(InvalidReconnectBackoff {
149 message: String::from("reconnect max_backoff must be greater than zero"),
150 }));
151 }
152 if config.reconnect.max_attempts == 0 {
153 return Err(WorkerError::registration(InvalidReconnectBackoff {
154 message: String::from("reconnect max_attempts must be greater than zero"),
155 }));
156 }
157 Ok(Self {
158 initial: config.reconnect.initial_backoff,
159 max: config.reconnect.max_backoff,
160 attempts: config.reconnect.max_attempts,
161 })
162 }
163
164 #[must_use]
169 pub fn delay_for_attempt(&self, completed_failures: usize) -> Duration {
170 let bounded_shift = completed_failures.saturating_sub(1).min(31);
171 let shift = u32::try_from(bounded_shift).map_or(31, |shift| shift);
172 let factor = 1_u32.checked_shl(shift).map_or(u32::MAX, |factor| factor);
173 self.initial.saturating_mul(factor).min(self.max)
174 }
175
176 #[must_use]
178 pub const fn attempts(&self) -> usize {
179 self.attempts
180 }
181
182 #[must_use]
190 pub const fn max_delay(&self) -> Duration {
191 self.max
192 }
193}
194
195pub async fn connect_registered_grpc_session(
201 config: &WorkerConfig,
202 activity_types: Vec<String>,
203 available_handlers: &BTreeSet<String>,
204) -> Result<GrpcWorkerSession, WorkerError> {
205 let session = GrpcWorkerSession::connect(config.clone()).await?;
206 register_connected_session(session, config, activity_types, available_handlers).await
207}
208
209pub async fn register_connected_session<S>(
215 mut session: S,
216 config: &WorkerConfig,
217 activity_types: Vec<String>,
218 available_handlers: &BTreeSet<String>,
219) -> Result<S, WorkerError>
220where
221 S: WorkerSession,
222{
223 session.handshake(config).await?;
224 session.register(activity_types, available_handlers).await?;
225 Ok(session)
226}
227
228pub async fn reconnect_with_backoff<S, F, Fut>(
237 config: &WorkerConfig,
238 activity_types: Vec<String>,
239 available_handlers: &BTreeSet<String>,
240 connect: F,
241) -> Result<S, WorkerError>
242where
243 S: WorkerSession,
244 F: FnMut() -> Fut,
245 Fut: Future<Output = Result<S, WorkerError>>,
246{
247 reconnect_with_sleep(
248 config,
249 activity_types,
250 available_handlers,
251 connect,
252 tokio::time::sleep,
253 )
254 .await
255}
256
257pub async fn reconnect_with_sleep<S, F, Fut, Sleep, SleepFut>(
267 config: &WorkerConfig,
268 activity_types: Vec<String>,
269 available_handlers: &BTreeSet<String>,
270 mut connect: F,
271 mut sleep: Sleep,
272) -> Result<S, WorkerError>
273where
274 S: WorkerSession,
275 F: FnMut() -> Fut,
276 Fut: Future<Output = Result<S, WorkerError>>,
277 Sleep: FnMut(Duration) -> SleepFut,
278 SleepFut: Future<Output = ()>,
279{
280 let backoff = ReconnectBackoff::from_config(config)?;
281
282 for attempt in 1..=backoff.attempts() {
283 debug!(attempt, "attempting worker reconnect");
284 let result = match connect().await {
285 Ok(session) => {
286 register_connected_session(
287 session,
288 config,
289 activity_types.clone(),
290 available_handlers,
291 )
292 .await
293 }
294 Err(error) => Err(error),
295 };
296
297 match result {
298 Ok(session) => {
299 debug!(attempt, "worker reconnect succeeded");
300 return Ok(session);
301 }
302 Err(error) => {
303 if !error.is_retryable() {
304 error!(
305 attempt,
306 error = %error,
307 "worker reconnect denied by server; not retrying"
308 );
309 return Err(error);
310 }
311 if attempt == backoff.attempts() {
312 error!(attempt, error = %error, "worker reconnect attempts exhausted");
313 return Err(error);
314 }
315 let delay = backoff.delay_for_attempt(attempt);
316 warn!(
317 attempt,
318 delay_ms = delay.as_millis(),
319 error = %error,
320 "worker reconnect failed; backing off"
321 );
322 sleep(delay).await;
323 }
324 }
325 }
326
327 Err(WorkerError::registration(InvalidReconnectBackoff {
328 message: String::from("reconnect_max_attempts must be greater than zero"),
329 }))
330}
331
332pub async fn re_report_unacked<S>(
339 tracker: &UnackedResultTracker,
340 session: &mut S,
341) -> Result<(), WorkerError>
342where
343 S: WorkerSession,
344{
345 for report in tracker.snapshot() {
346 match report {
347 PendingActivityReport::Completed {
348 workflow_id,
349 activity_id,
350 output,
351 } => {
352 debug!(
353 workflow_id = %workflow_id,
354 activity_id = activity_id.sequence_position(),
355 "re-reporting unacknowledged activity result"
356 );
357 session
358 .report_result(workflow_id, activity_id, output)
359 .await?;
360 }
361 PendingActivityReport::Failed {
362 workflow_id,
363 activity_id,
364 failure,
365 } => {
366 debug!(
367 workflow_id = %workflow_id,
368 activity_id = activity_id.sequence_position(),
369 "re-reporting unacknowledged activity failure"
370 );
371 session
372 .report_failure(workflow_id, activity_id, failure)
373 .await?;
374 }
375 }
376 }
377 Ok(())
378}
379
380#[derive(Debug, thiserror::Error)]
381#[error("{message}")]
382struct InvalidReconnectBackoff {
383 message: String,
384}
385
386#[cfg(test)]
387mod tests {
388 use std::cell::RefCell;
389 use std::collections::BTreeSet;
390 use std::rc::Rc;
391 use std::time::Duration;
392
393 use aion_core::{
394 ActivityError, ActivityErrorKind, ActivityId, ContentType, Payload, WorkflowId,
395 };
396 use async_trait::async_trait;
397 use futures::stream;
398
399 use super::{
400 PendingActivityReport, UnackedResultTracker, re_report_unacked, reconnect_with_sleep,
401 };
402 use crate::error::WorkerError;
403 use crate::protocol::{
404 WorkerSession, WorkerSessionEvent, WorkerTaskStream, validate_activity_handlers,
405 };
406 use crate::{ReconnectConfig, WorkerConfig};
407
408 #[test]
409 fn tracker_records_reports_and_acknowledges_by_workflow_and_activity_id() {
410 let workflow_id = WorkflowId::new_v4();
411 let first_id = ActivityId::from_sequence_position(1);
412 let second_id = ActivityId::from_sequence_position(2);
413 let mut tracker = UnackedResultTracker::new();
414
415 tracker.record(PendingActivityReport::Completed {
416 workflow_id: workflow_id.clone(),
417 activity_id: first_id.clone(),
418 output: Payload::new(ContentType::Json, b"{\"first\":true}".to_vec()),
419 });
420 tracker.record(PendingActivityReport::Completed {
421 workflow_id: workflow_id.clone(),
422 activity_id: second_id.clone(),
423 output: Payload::new(ContentType::Json, b"{\"second\":true}".to_vec()),
424 });
425
426 assert_eq!(tracker.len(), 2);
427 assert!(tracker.acknowledge(&workflow_id, &first_id).is_some());
428 assert_eq!(tracker.len(), 1);
429 assert!(tracker.get(&workflow_id, &second_id).is_some());
430 assert!(tracker.get(&workflow_id, &first_id).is_none());
431 }
432
433 #[test]
434 fn tracker_keeps_reports_for_distinct_workflows_at_the_same_sequence_position() {
435 let first_workflow = WorkflowId::new_v4();
436 let second_workflow = WorkflowId::new_v4();
437 let activity_id = ActivityId::from_sequence_position(3);
438 let mut tracker = UnackedResultTracker::new();
439
440 tracker.record(PendingActivityReport::Completed {
441 workflow_id: first_workflow.clone(),
442 activity_id: activity_id.clone(),
443 output: Payload::new(ContentType::Json, b"{\"workflow\":\"a\"}".to_vec()),
444 });
445 tracker.record(PendingActivityReport::Completed {
446 workflow_id: second_workflow.clone(),
447 activity_id: activity_id.clone(),
448 output: Payload::new(ContentType::Json, b"{\"workflow\":\"b\"}".to_vec()),
449 });
450
451 assert_eq!(tracker.len(), 2);
452 assert!(tracker.get(&first_workflow, &activity_id).is_some());
453 assert!(tracker.get(&second_workflow, &activity_id).is_some());
454 assert!(
455 tracker.acknowledge(&first_workflow, &activity_id).is_some(),
456 "acknowledging one workflow's report must not require the other's"
457 );
458 assert_eq!(tracker.len(), 1);
459 assert!(tracker.get(&second_workflow, &activity_id).is_some());
460 }
461
462 #[tokio::test]
463 async fn reconnect_fails_once_then_handshakes_and_registers() -> Result<(), WorkerError> {
464 let config = test_config();
465 let attempts = Rc::new(RefCell::new(0usize));
466 let sleeps = Rc::new(RefCell::new(Vec::new()));
467 let activity_types = vec![String::from("charge-card")];
468 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
469 let attempts_for_connect = Rc::clone(&attempts);
470 let sleeps_for_sleep = Rc::clone(&sleeps);
471
472 let session = reconnect_with_sleep(
473 &config,
474 activity_types.clone(),
475 &handlers,
476 move || {
477 let attempts_for_connect = Rc::clone(&attempts_for_connect);
478 async move {
479 let mut attempts = attempts_for_connect.borrow_mut();
480 *attempts += 1;
481 if *attempts == 1 {
482 Err(WorkerError::Transport {
483 source: tonic::Status::unavailable("disconnected"),
484 })
485 } else {
486 Ok(ReconnectFakeSession::default())
487 }
488 }
489 },
490 move |delay| {
491 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
492 async move {
493 sleeps_for_sleep.borrow_mut().push(delay);
494 }
495 },
496 )
497 .await?;
498
499 assert_eq!(*attempts.borrow(), 2);
500 assert_eq!(*sleeps.borrow(), vec![Duration::from_millis(5)]);
501 assert_eq!(session.handshakes, vec![String::from("worker-a")]);
502 assert_eq!(session.registrations, vec![activity_types]);
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn permission_denied_registration_stops_after_one_attempt() {
508 let config = test_config();
509 let attempts = Rc::new(RefCell::new(0usize));
510 let sleeps = Rc::new(RefCell::new(Vec::new()));
511 let activity_types = vec![String::from("charge-card")];
512 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
513 let attempts_for_connect = Rc::clone(&attempts);
514 let sleeps_for_sleep = Rc::clone(&sleeps);
515
516 let result = reconnect_with_sleep(
517 &config,
518 activity_types,
519 &handlers,
520 move || {
521 let attempts_for_connect = Rc::clone(&attempts_for_connect);
522 async move {
523 *attempts_for_connect.borrow_mut() += 1;
524 Ok(DeniedRegistrationSession {
525 denial: tonic::Status::permission_denied(
526 "namespace `payments` is not granted to subject `worker-a`",
527 ),
528 })
529 }
530 },
531 move |delay| {
532 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
533 async move {
534 sleeps_for_sleep.borrow_mut().push(delay);
535 }
536 },
537 )
538 .await;
539
540 assert!(result.is_err());
541 let Err(error) = result else { return };
542 assert_eq!(*attempts.borrow(), 1);
543 assert!(sleeps.borrow().is_empty());
544 assert!(!error.is_retryable());
545 assert!(matches!(
546 error.grpc_status().map(tonic::Status::code),
547 Some(tonic::Code::PermissionDenied)
548 ));
549 assert_eq!(
550 error.grpc_status().map(tonic::Status::message),
551 Some("namespace `payments` is not granted to subject `worker-a`")
552 );
553 assert!(
554 error
555 .to_string()
556 .contains("namespace `payments` is not granted to subject `worker-a`")
557 );
558 }
559
560 #[tokio::test]
561 async fn unauthenticated_handshake_stops_after_one_attempt() {
562 let config = test_config();
563 let attempts = Rc::new(RefCell::new(0usize));
564 let sleeps = Rc::new(RefCell::new(Vec::new()));
565 let activity_types = vec![String::from("charge-card")];
566 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
567 let attempts_for_connect = Rc::clone(&attempts);
568 let sleeps_for_sleep = Rc::clone(&sleeps);
569
570 let result = reconnect_with_sleep(
571 &config,
572 activity_types,
573 &handlers,
574 move || {
575 let attempts_for_connect = Rc::clone(&attempts_for_connect);
576 async move {
577 *attempts_for_connect.borrow_mut() += 1;
578 Err::<ReconnectFakeSession, _>(WorkerError::Handshake {
579 source: tonic::Status::unauthenticated("worker credentials were rejected"),
580 })
581 }
582 },
583 move |delay| {
584 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
585 async move {
586 sleeps_for_sleep.borrow_mut().push(delay);
587 }
588 },
589 )
590 .await;
591
592 assert!(result.is_err());
593 let Err(error) = result else { return };
594 assert_eq!(*attempts.borrow(), 1);
595 assert!(sleeps.borrow().is_empty());
596 assert!(!error.is_retryable());
597 assert!(matches!(
598 error.grpc_status().map(tonic::Status::code),
599 Some(tonic::Code::Unauthenticated)
600 ));
601 assert!(
602 error
603 .to_string()
604 .contains("worker credentials were rejected")
605 );
606 }
607
608 #[tokio::test]
609 async fn unavailable_transport_retries_until_attempts_exhausted() {
610 let config = test_config();
611 let attempts = Rc::new(RefCell::new(0usize));
612 let sleeps = Rc::new(RefCell::new(Vec::new()));
613 let activity_types = vec![String::from("charge-card")];
614 let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
615 let attempts_for_connect = Rc::clone(&attempts);
616 let sleeps_for_sleep = Rc::clone(&sleeps);
617
618 let result = reconnect_with_sleep(
619 &config,
620 activity_types,
621 &handlers,
622 move || {
623 let attempts_for_connect = Rc::clone(&attempts_for_connect);
624 async move {
625 *attempts_for_connect.borrow_mut() += 1;
626 Err::<ReconnectFakeSession, _>(WorkerError::Transport {
627 source: tonic::Status::unavailable("engine unreachable"),
628 })
629 }
630 },
631 move |delay| {
632 let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
633 async move {
634 sleeps_for_sleep.borrow_mut().push(delay);
635 }
636 },
637 )
638 .await;
639
640 assert!(result.is_err());
641 let Err(error) = result else { return };
642 assert_eq!(*attempts.borrow(), 3);
643 assert_eq!(
644 *sleeps.borrow(),
645 vec![Duration::from_millis(5), Duration::from_millis(10)]
646 );
647 assert!(error.is_retryable());
648 assert!(matches!(
649 error.grpc_status().map(tonic::Status::code),
650 Some(tonic::Code::Unavailable)
651 ));
652 }
653
654 #[tokio::test]
655 async fn re_reports_unacked_reports_without_removing_them() -> Result<(), WorkerError> {
656 let workflow_id = WorkflowId::new_v4();
657 let activity_id = ActivityId::from_sequence_position(7);
658 let output = Payload::new(ContentType::Json, b"{}".to_vec());
659 let mut tracker = UnackedResultTracker::new();
660 tracker.record(PendingActivityReport::Completed {
661 workflow_id: workflow_id.clone(),
662 activity_id: activity_id.clone(),
663 output: output.clone(),
664 });
665 let mut session = ReconnectFakeSession::default();
666
667 re_report_unacked(&tracker, &mut session).await?;
668
669 assert_eq!(tracker.len(), 1);
670 assert_eq!(
671 session.reports,
672 vec![RecordedReport::Completed(workflow_id, activity_id, output)]
673 );
674 Ok(())
675 }
676
677 #[derive(Default)]
678 struct ReconnectFakeSession {
679 handshakes: Vec<String>,
680 registrations: Vec<Vec<String>>,
681 reports: Vec<RecordedReport>,
682 }
683
684 struct DeniedRegistrationSession {
688 denial: tonic::Status,
689 }
690
691 #[async_trait]
692 impl WorkerSession for DeniedRegistrationSession {
693 async fn handshake(&mut self, _config: &WorkerConfig) -> Result<(), WorkerError> {
694 Ok(())
695 }
696
697 async fn register(
698 &mut self,
699 activity_types: Vec<String>,
700 available_handlers: &BTreeSet<String>,
701 ) -> Result<(), WorkerError> {
702 validate_activity_handlers(&activity_types, available_handlers)?;
703 Err(WorkerError::Registration {
704 source: Box::new(self.denial.clone()),
705 })
706 }
707
708 fn receive_tasks(&mut self) -> WorkerTaskStream {
709 Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
710 }
711
712 async fn report_result(
713 &mut self,
714 workflow_id: WorkflowId,
715 activity_id: ActivityId,
716 result: Payload,
717 ) -> Result<(), WorkerError> {
718 drop((workflow_id, activity_id, result));
719 Err(WorkerError::Registration {
720 source: Box::new(self.denial.clone()),
721 })
722 }
723
724 async fn report_failure(
725 &mut self,
726 workflow_id: WorkflowId,
727 activity_id: ActivityId,
728 failure: ActivityError,
729 ) -> Result<(), WorkerError> {
730 drop((workflow_id, activity_id, failure));
731 Err(WorkerError::Registration {
732 source: Box::new(self.denial.clone()),
733 })
734 }
735
736 async fn send_heartbeat(
737 &mut self,
738 workflow_id: WorkflowId,
739 activity_id: ActivityId,
740 progress: Option<Payload>,
741 ) -> Result<(), WorkerError> {
742 drop((workflow_id, activity_id, progress));
743 Err(WorkerError::Registration {
744 source: Box::new(self.denial.clone()),
745 })
746 }
747 }
748
749 #[derive(Clone, Debug, PartialEq, Eq)]
750 enum RecordedReport {
751 Completed(WorkflowId, ActivityId, Payload),
752 Failed(WorkflowId, ActivityId, ActivityError),
753 }
754
755 #[async_trait]
756 impl WorkerSession for ReconnectFakeSession {
757 async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
758 self.handshakes.push(config.identity.clone());
759 Ok(())
760 }
761
762 async fn register(
763 &mut self,
764 activity_types: Vec<String>,
765 available_handlers: &BTreeSet<String>,
766 ) -> Result<(), WorkerError> {
767 validate_activity_handlers(&activity_types, available_handlers)?;
768 self.registrations.push(activity_types);
769 Ok(())
770 }
771
772 fn receive_tasks(&mut self) -> WorkerTaskStream {
773 Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
774 }
775
776 async fn report_result(
777 &mut self,
778 workflow_id: WorkflowId,
779 activity_id: ActivityId,
780 result: Payload,
781 ) -> Result<(), WorkerError> {
782 self.reports
783 .push(RecordedReport::Completed(workflow_id, activity_id, result));
784 Ok(())
785 }
786
787 async fn report_failure(
788 &mut self,
789 workflow_id: WorkflowId,
790 activity_id: ActivityId,
791 failure: ActivityError,
792 ) -> Result<(), WorkerError> {
793 self.reports
794 .push(RecordedReport::Failed(workflow_id, activity_id, failure));
795 Ok(())
796 }
797
798 async fn send_heartbeat(
799 &mut self,
800 workflow_id: WorkflowId,
801 activity_id: ActivityId,
802 progress: Option<Payload>,
803 ) -> Result<(), WorkerError> {
804 drop((workflow_id, activity_id, progress));
805 Ok(())
806 }
807 }
808
809 fn test_config() -> WorkerConfig {
810 WorkerConfig::new(
811 "http://127.0.0.1:50051",
812 "payments",
813 "worker-a",
814 2,
815 ReconnectConfig::new(Duration::from_millis(5), Duration::from_millis(20), 3),
816 None,
817 )
818 }
819
820 fn terminal_failure() -> ActivityError {
821 ActivityError {
822 kind: ActivityErrorKind::Terminal,
823 message: String::from("terminal"),
824 details: None,
825 }
826 }
827
828 #[test]
829 fn tracker_replaces_existing_activity_report() {
830 let workflow_id = WorkflowId::new_v4();
831 let activity_id = ActivityId::from_sequence_position(9);
832 let mut tracker = UnackedResultTracker::new();
833 tracker.record(PendingActivityReport::Completed {
834 workflow_id: workflow_id.clone(),
835 activity_id: activity_id.clone(),
836 output: Payload::new(ContentType::Json, b"{}".to_vec()),
837 });
838 tracker.record(PendingActivityReport::Failed {
839 workflow_id: workflow_id.clone(),
840 activity_id: activity_id.clone(),
841 failure: terminal_failure(),
842 });
843
844 assert_eq!(tracker.len(), 1);
845 assert!(matches!(
846 tracker.get(&workflow_id, &activity_id),
847 Some(PendingActivityReport::Failed { .. })
848 ));
849 }
850}