Skip to main content

aion_worker/protocol/
reconnect.rs

1//! Backoff reconnect, re-register, and re-report un-acked results.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::future::Future;
5use std::time::Duration;
6
7use aion_core::{ActivityError, ActivityId, Payload, WorkflowId};
8use tracing::{debug, error, warn};
9use uuid::Uuid;
10
11use crate::config::WorkerConfig;
12use crate::error::WorkerError;
13use crate::protocol::{GrpcWorkerSession, WorkerSession};
14
15/// Result or failure computed locally and not yet acknowledged by the engine.
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum PendingActivityReport {
18    /// Successful activity output to re-report until acknowledged.
19    Completed {
20        /// Workflow owning the activity.
21        workflow_id: WorkflowId,
22        /// Activity identifier used by AW for idempotent ingest.
23        activity_id: ActivityId,
24        /// Opaque activity output payload.
25        output: Payload,
26    },
27    /// Explicitly classified activity failure to re-report until acknowledged.
28    Failed {
29        /// Workflow owning the activity.
30        workflow_id: WorkflowId,
31        /// Activity identifier used by AW for idempotent ingest.
32        activity_id: ActivityId,
33        /// Classified activity error.
34        failure: ActivityError,
35    },
36}
37
38impl PendingActivityReport {
39    /// Returns the report's activity id key.
40    #[must_use]
41    pub const fn activity_id(&self) -> &ActivityId {
42        match self {
43            Self::Completed { activity_id, .. } | Self::Failed { activity_id, .. } => activity_id,
44        }
45    }
46
47    /// Returns the workflow owning the report's activity.
48    #[must_use]
49    pub const fn workflow_id(&self) -> &WorkflowId {
50        match self {
51            Self::Completed { workflow_id, .. } | Self::Failed { workflow_id, .. } => workflow_id,
52        }
53    }
54}
55
56/// Deterministic tracker key: activity ids are sequence positions scoped to
57/// one workflow, so distinct workflows legitimately collide on the bare
58/// position and must be keyed by workflow as well.
59type PendingReportKey = (Uuid, u64);
60
61fn pending_report_key(workflow_id: &WorkflowId, activity_id: &ActivityId) -> PendingReportKey {
62    (workflow_id.as_uuid(), activity_id.sequence_position())
63}
64
65/// In-memory source of truth for locally reported results awaiting engine ack.
66#[derive(Clone, Debug, Default, PartialEq, Eq)]
67pub struct UnackedResultTracker {
68    reports: BTreeMap<PendingReportKey, PendingActivityReport>,
69}
70
71impl UnackedResultTracker {
72    /// Creates an empty tracker.
73    #[must_use]
74    pub const fn new() -> Self {
75        Self {
76            reports: BTreeMap::new(),
77        }
78    }
79
80    /// Records a report, replacing any earlier pending report for the same
81    /// workflow and activity id.
82    pub fn record(&mut self, report: PendingActivityReport) {
83        let key = pending_report_key(report.workflow_id(), report.activity_id());
84        self.reports.insert(key, report);
85    }
86
87    /// Drops a report once the engine explicitly acknowledges it.
88    pub fn acknowledge(
89        &mut self,
90        workflow_id: &WorkflowId,
91        activity_id: &ActivityId,
92    ) -> Option<PendingActivityReport> {
93        self.reports
94            .remove(&pending_report_key(workflow_id, activity_id))
95    }
96
97    /// Returns the number of unacknowledged reports.
98    #[must_use]
99    pub fn len(&self) -> usize {
100        self.reports.len()
101    }
102
103    /// Returns true when no reports are waiting for acknowledgement.
104    #[must_use]
105    pub fn is_empty(&self) -> bool {
106        self.reports.is_empty()
107    }
108
109    /// Gets a pending report by its workflow and activity id.
110    #[must_use]
111    pub fn get(
112        &self,
113        workflow_id: &WorkflowId,
114        activity_id: &ActivityId,
115    ) -> Option<&PendingActivityReport> {
116        self.reports
117            .get(&pending_report_key(workflow_id, activity_id))
118    }
119
120    /// Returns a deterministic snapshot for re-reporting without holding a borrow.
121    #[must_use]
122    pub fn snapshot(&self) -> Vec<PendingActivityReport> {
123        self.reports.values().cloned().collect()
124    }
125}
126
127/// Validated reconnect backoff settings drawn from [`WorkerConfig`].
128#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct ReconnectBackoff {
130    initial: Duration,
131    max: Duration,
132    attempts: usize,
133}
134
135impl ReconnectBackoff {
136    /// Builds reconnect backoff from worker config.
137    ///
138    /// # Errors
139    ///
140    /// Returns [`WorkerError::Registration`] if delays or attempt counts are zero.
141    pub fn from_config(config: &WorkerConfig) -> Result<Self, WorkerError> {
142        if config.reconnect.initial_backoff.is_zero() {
143            return Err(WorkerError::registration(InvalidReconnectBackoff {
144                message: String::from("reconnect initial_backoff must be greater than zero"),
145            }));
146        }
147        if config.reconnect.max_backoff.is_zero() {
148            return Err(WorkerError::registration(InvalidReconnectBackoff {
149                message: String::from("reconnect max_backoff must be greater than zero"),
150            }));
151        }
152        if config.reconnect.max_attempts == 0 {
153            return Err(WorkerError::registration(InvalidReconnectBackoff {
154                message: String::from("reconnect max_attempts must be greater than zero"),
155            }));
156        }
157        Ok(Self {
158            initial: config.reconnect.initial_backoff,
159            max: config.reconnect.max_backoff,
160            attempts: config.reconnect.max_attempts,
161        })
162    }
163
164    /// Returns the bounded exponential delay after `completed_failures` failures.
165    ///
166    /// The delay doubles per completed failure starting from the configured
167    /// initial backoff and is capped at the configured maximum backoff.
168    #[must_use]
169    pub fn delay_for_attempt(&self, completed_failures: usize) -> Duration {
170        let bounded_shift = completed_failures.saturating_sub(1).min(31);
171        let shift = u32::try_from(bounded_shift).map_or(31, |shift| shift);
172        let factor = 1_u32.checked_shl(shift).map_or(u32::MAX, |factor| factor);
173        self.initial.saturating_mul(factor).min(self.max)
174    }
175
176    /// Returns the configured maximum number of reconnect attempts.
177    #[must_use]
178    pub const fn attempts(&self) -> usize {
179        self.attempts
180    }
181
182    /// Returns the configured maximum backoff delay cap.
183    ///
184    /// The run loop also uses this as its session-health threshold: the cap
185    /// is the policy's own definition of the longest pause, so an
186    /// established session that survives longer than it is demonstrably past
187    /// the flapping regime and resets the cumulative drop budget when it
188    /// eventually drops.
189    #[must_use]
190    pub const fn max_delay(&self) -> Duration {
191        self.max
192    }
193}
194
195/// Connects, handshakes, and registers a fresh gRPC worker session.
196///
197/// # Errors
198///
199/// Returns [`WorkerError`] if connection, handshake, or registration fails.
200pub async fn connect_registered_grpc_session(
201    config: &WorkerConfig,
202    activity_types: Vec<String>,
203    available_handlers: &BTreeSet<String>,
204) -> Result<GrpcWorkerSession, WorkerError> {
205    let session = GrpcWorkerSession::connect(config.clone()).await?;
206    register_connected_session(session, config, activity_types, available_handlers).await
207}
208
209/// Handshakes and registers an already-connected session.
210///
211/// # Errors
212///
213/// Returns [`WorkerError`] if handshake or registration fails.
214pub async fn register_connected_session<S>(
215    mut session: S,
216    config: &WorkerConfig,
217    activity_types: Vec<String>,
218    available_handlers: &BTreeSet<String>,
219) -> Result<S, WorkerError>
220where
221    S: WorkerSession,
222{
223    session.handshake(config).await?;
224    session.register(activity_types, available_handlers).await?;
225    Ok(session)
226}
227
228/// Reconnects with bounded exponential backoff using an injected session factory.
229///
230/// # Errors
231///
232/// Returns the last [`WorkerError`] after configured attempts are exhausted, or
233/// immediately when the failure is a non-retryable `PermissionDenied` /
234/// `Unauthenticated` denial, or if the config contains invalid zero reconnect
235/// settings.
236pub async fn reconnect_with_backoff<S, F, Fut>(
237    config: &WorkerConfig,
238    activity_types: Vec<String>,
239    available_handlers: &BTreeSet<String>,
240    connect: F,
241) -> Result<S, WorkerError>
242where
243    S: WorkerSession,
244    F: FnMut() -> Fut,
245    Fut: Future<Output = Result<S, WorkerError>>,
246{
247    reconnect_with_sleep(
248        config,
249        activity_types,
250        available_handlers,
251        connect,
252        tokio::time::sleep,
253    )
254    .await
255}
256
257/// Testable reconnect helper with injectable sleep.
258///
259/// # Errors
260///
261/// Returns the last [`WorkerError`] after configured attempts are exhausted, or
262/// immediately — without consuming further attempts — when a failure is a
263/// non-retryable denial ([`WorkerError::is_retryable`] is false, i.e. the
264/// server answered `PermissionDenied` or `Unauthenticated`), or if the config
265/// contains invalid zero reconnect settings.
266pub async fn reconnect_with_sleep<S, F, Fut, Sleep, SleepFut>(
267    config: &WorkerConfig,
268    activity_types: Vec<String>,
269    available_handlers: &BTreeSet<String>,
270    mut connect: F,
271    mut sleep: Sleep,
272) -> Result<S, WorkerError>
273where
274    S: WorkerSession,
275    F: FnMut() -> Fut,
276    Fut: Future<Output = Result<S, WorkerError>>,
277    Sleep: FnMut(Duration) -> SleepFut,
278    SleepFut: Future<Output = ()>,
279{
280    let backoff = ReconnectBackoff::from_config(config)?;
281
282    for attempt in 1..=backoff.attempts() {
283        debug!(attempt, "attempting worker reconnect");
284        let result = match connect().await {
285            Ok(session) => {
286                register_connected_session(
287                    session,
288                    config,
289                    activity_types.clone(),
290                    available_handlers,
291                )
292                .await
293            }
294            Err(error) => Err(error),
295        };
296
297        match result {
298            Ok(session) => {
299                debug!(attempt, "worker reconnect succeeded");
300                return Ok(session);
301            }
302            Err(error) => {
303                if !error.is_retryable() {
304                    error!(
305                        attempt,
306                        error = %error,
307                        "worker reconnect denied by server; not retrying"
308                    );
309                    return Err(error);
310                }
311                if attempt == backoff.attempts() {
312                    error!(attempt, error = %error, "worker reconnect attempts exhausted");
313                    return Err(error);
314                }
315                let delay = backoff.delay_for_attempt(attempt);
316                warn!(
317                    attempt,
318                    delay_ms = delay.as_millis(),
319                    error = %error,
320                    "worker reconnect failed; backing off"
321                );
322                sleep(delay).await;
323            }
324        }
325    }
326
327    Err(WorkerError::registration(InvalidReconnectBackoff {
328        message: String::from("reconnect_max_attempts must be greater than zero"),
329    }))
330}
331
332/// Re-reports every unacknowledged result/failure before serving new work.
333///
334/// # Errors
335///
336/// Returns [`WorkerError`] if any re-report send fails. Entries are not removed;
337/// only explicit acknowledgement may clear the tracker.
338pub async fn re_report_unacked<S>(
339    tracker: &UnackedResultTracker,
340    session: &mut S,
341) -> Result<(), WorkerError>
342where
343    S: WorkerSession,
344{
345    for report in tracker.snapshot() {
346        match report {
347            PendingActivityReport::Completed {
348                workflow_id,
349                activity_id,
350                output,
351            } => {
352                debug!(
353                    workflow_id = %workflow_id,
354                    activity_id = activity_id.sequence_position(),
355                    "re-reporting unacknowledged activity result"
356                );
357                session
358                    .report_result(workflow_id, activity_id, output)
359                    .await?;
360            }
361            PendingActivityReport::Failed {
362                workflow_id,
363                activity_id,
364                failure,
365            } => {
366                debug!(
367                    workflow_id = %workflow_id,
368                    activity_id = activity_id.sequence_position(),
369                    "re-reporting unacknowledged activity failure"
370                );
371                session
372                    .report_failure(workflow_id, activity_id, failure)
373                    .await?;
374            }
375        }
376    }
377    Ok(())
378}
379
380#[derive(Debug, thiserror::Error)]
381#[error("{message}")]
382struct InvalidReconnectBackoff {
383    message: String,
384}
385
386#[cfg(test)]
387mod tests {
388    use std::cell::RefCell;
389    use std::collections::BTreeSet;
390    use std::rc::Rc;
391    use std::time::Duration;
392
393    use aion_core::{
394        ActivityError, ActivityErrorKind, ActivityId, ContentType, Payload, WorkflowId,
395    };
396    use async_trait::async_trait;
397    use futures::stream;
398
399    use super::{
400        PendingActivityReport, UnackedResultTracker, re_report_unacked, reconnect_with_sleep,
401    };
402    use crate::error::WorkerError;
403    use crate::protocol::{
404        WorkerSession, WorkerSessionEvent, WorkerTaskStream, validate_activity_handlers,
405    };
406    use crate::{ReconnectConfig, WorkerConfig};
407
408    #[test]
409    fn tracker_records_reports_and_acknowledges_by_workflow_and_activity_id() {
410        let workflow_id = WorkflowId::new_v4();
411        let first_id = ActivityId::from_sequence_position(1);
412        let second_id = ActivityId::from_sequence_position(2);
413        let mut tracker = UnackedResultTracker::new();
414
415        tracker.record(PendingActivityReport::Completed {
416            workflow_id: workflow_id.clone(),
417            activity_id: first_id.clone(),
418            output: Payload::new(ContentType::Json, b"{\"first\":true}".to_vec()),
419        });
420        tracker.record(PendingActivityReport::Completed {
421            workflow_id: workflow_id.clone(),
422            activity_id: second_id.clone(),
423            output: Payload::new(ContentType::Json, b"{\"second\":true}".to_vec()),
424        });
425
426        assert_eq!(tracker.len(), 2);
427        assert!(tracker.acknowledge(&workflow_id, &first_id).is_some());
428        assert_eq!(tracker.len(), 1);
429        assert!(tracker.get(&workflow_id, &second_id).is_some());
430        assert!(tracker.get(&workflow_id, &first_id).is_none());
431    }
432
433    #[test]
434    fn tracker_keeps_reports_for_distinct_workflows_at_the_same_sequence_position() {
435        let first_workflow = WorkflowId::new_v4();
436        let second_workflow = WorkflowId::new_v4();
437        let activity_id = ActivityId::from_sequence_position(3);
438        let mut tracker = UnackedResultTracker::new();
439
440        tracker.record(PendingActivityReport::Completed {
441            workflow_id: first_workflow.clone(),
442            activity_id: activity_id.clone(),
443            output: Payload::new(ContentType::Json, b"{\"workflow\":\"a\"}".to_vec()),
444        });
445        tracker.record(PendingActivityReport::Completed {
446            workflow_id: second_workflow.clone(),
447            activity_id: activity_id.clone(),
448            output: Payload::new(ContentType::Json, b"{\"workflow\":\"b\"}".to_vec()),
449        });
450
451        assert_eq!(tracker.len(), 2);
452        assert!(tracker.get(&first_workflow, &activity_id).is_some());
453        assert!(tracker.get(&second_workflow, &activity_id).is_some());
454        assert!(
455            tracker.acknowledge(&first_workflow, &activity_id).is_some(),
456            "acknowledging one workflow's report must not require the other's"
457        );
458        assert_eq!(tracker.len(), 1);
459        assert!(tracker.get(&second_workflow, &activity_id).is_some());
460    }
461
462    #[tokio::test]
463    async fn reconnect_fails_once_then_handshakes_and_registers() -> Result<(), WorkerError> {
464        let config = test_config();
465        let attempts = Rc::new(RefCell::new(0usize));
466        let sleeps = Rc::new(RefCell::new(Vec::new()));
467        let activity_types = vec![String::from("charge-card")];
468        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
469        let attempts_for_connect = Rc::clone(&attempts);
470        let sleeps_for_sleep = Rc::clone(&sleeps);
471
472        let session = reconnect_with_sleep(
473            &config,
474            activity_types.clone(),
475            &handlers,
476            move || {
477                let attempts_for_connect = Rc::clone(&attempts_for_connect);
478                async move {
479                    let mut attempts = attempts_for_connect.borrow_mut();
480                    *attempts += 1;
481                    if *attempts == 1 {
482                        Err(WorkerError::Transport {
483                            source: tonic::Status::unavailable("disconnected"),
484                        })
485                    } else {
486                        Ok(ReconnectFakeSession::default())
487                    }
488                }
489            },
490            move |delay| {
491                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
492                async move {
493                    sleeps_for_sleep.borrow_mut().push(delay);
494                }
495            },
496        )
497        .await?;
498
499        assert_eq!(*attempts.borrow(), 2);
500        assert_eq!(*sleeps.borrow(), vec![Duration::from_millis(5)]);
501        assert_eq!(session.handshakes, vec![String::from("worker-a")]);
502        assert_eq!(session.registrations, vec![activity_types]);
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn permission_denied_registration_stops_after_one_attempt() {
508        let config = test_config();
509        let attempts = Rc::new(RefCell::new(0usize));
510        let sleeps = Rc::new(RefCell::new(Vec::new()));
511        let activity_types = vec![String::from("charge-card")];
512        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
513        let attempts_for_connect = Rc::clone(&attempts);
514        let sleeps_for_sleep = Rc::clone(&sleeps);
515
516        let result = reconnect_with_sleep(
517            &config,
518            activity_types,
519            &handlers,
520            move || {
521                let attempts_for_connect = Rc::clone(&attempts_for_connect);
522                async move {
523                    *attempts_for_connect.borrow_mut() += 1;
524                    Ok(DeniedRegistrationSession {
525                        denial: tonic::Status::permission_denied(
526                            "namespace `payments` is not granted to subject `worker-a`",
527                        ),
528                    })
529                }
530            },
531            move |delay| {
532                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
533                async move {
534                    sleeps_for_sleep.borrow_mut().push(delay);
535                }
536            },
537        )
538        .await;
539
540        assert!(result.is_err());
541        let Err(error) = result else { return };
542        assert_eq!(*attempts.borrow(), 1);
543        assert!(sleeps.borrow().is_empty());
544        assert!(!error.is_retryable());
545        assert!(matches!(
546            error.grpc_status().map(tonic::Status::code),
547            Some(tonic::Code::PermissionDenied)
548        ));
549        assert_eq!(
550            error.grpc_status().map(tonic::Status::message),
551            Some("namespace `payments` is not granted to subject `worker-a`")
552        );
553        assert!(
554            error
555                .to_string()
556                .contains("namespace `payments` is not granted to subject `worker-a`")
557        );
558    }
559
560    #[tokio::test]
561    async fn unauthenticated_handshake_stops_after_one_attempt() {
562        let config = test_config();
563        let attempts = Rc::new(RefCell::new(0usize));
564        let sleeps = Rc::new(RefCell::new(Vec::new()));
565        let activity_types = vec![String::from("charge-card")];
566        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
567        let attempts_for_connect = Rc::clone(&attempts);
568        let sleeps_for_sleep = Rc::clone(&sleeps);
569
570        let result = reconnect_with_sleep(
571            &config,
572            activity_types,
573            &handlers,
574            move || {
575                let attempts_for_connect = Rc::clone(&attempts_for_connect);
576                async move {
577                    *attempts_for_connect.borrow_mut() += 1;
578                    Err::<ReconnectFakeSession, _>(WorkerError::Handshake {
579                        source: tonic::Status::unauthenticated("worker credentials were rejected"),
580                    })
581                }
582            },
583            move |delay| {
584                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
585                async move {
586                    sleeps_for_sleep.borrow_mut().push(delay);
587                }
588            },
589        )
590        .await;
591
592        assert!(result.is_err());
593        let Err(error) = result else { return };
594        assert_eq!(*attempts.borrow(), 1);
595        assert!(sleeps.borrow().is_empty());
596        assert!(!error.is_retryable());
597        assert!(matches!(
598            error.grpc_status().map(tonic::Status::code),
599            Some(tonic::Code::Unauthenticated)
600        ));
601        assert!(
602            error
603                .to_string()
604                .contains("worker credentials were rejected")
605        );
606    }
607
608    #[tokio::test]
609    async fn unavailable_transport_retries_until_attempts_exhausted() {
610        let config = test_config();
611        let attempts = Rc::new(RefCell::new(0usize));
612        let sleeps = Rc::new(RefCell::new(Vec::new()));
613        let activity_types = vec![String::from("charge-card")];
614        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
615        let attempts_for_connect = Rc::clone(&attempts);
616        let sleeps_for_sleep = Rc::clone(&sleeps);
617
618        let result = reconnect_with_sleep(
619            &config,
620            activity_types,
621            &handlers,
622            move || {
623                let attempts_for_connect = Rc::clone(&attempts_for_connect);
624                async move {
625                    *attempts_for_connect.borrow_mut() += 1;
626                    Err::<ReconnectFakeSession, _>(WorkerError::Transport {
627                        source: tonic::Status::unavailable("engine unreachable"),
628                    })
629                }
630            },
631            move |delay| {
632                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
633                async move {
634                    sleeps_for_sleep.borrow_mut().push(delay);
635                }
636            },
637        )
638        .await;
639
640        assert!(result.is_err());
641        let Err(error) = result else { return };
642        assert_eq!(*attempts.borrow(), 3);
643        assert_eq!(
644            *sleeps.borrow(),
645            vec![Duration::from_millis(5), Duration::from_millis(10)]
646        );
647        assert!(error.is_retryable());
648        assert!(matches!(
649            error.grpc_status().map(tonic::Status::code),
650            Some(tonic::Code::Unavailable)
651        ));
652    }
653
654    #[tokio::test]
655    async fn re_reports_unacked_reports_without_removing_them() -> Result<(), WorkerError> {
656        let workflow_id = WorkflowId::new_v4();
657        let activity_id = ActivityId::from_sequence_position(7);
658        let output = Payload::new(ContentType::Json, b"{}".to_vec());
659        let mut tracker = UnackedResultTracker::new();
660        tracker.record(PendingActivityReport::Completed {
661            workflow_id: workflow_id.clone(),
662            activity_id: activity_id.clone(),
663            output: output.clone(),
664        });
665        let mut session = ReconnectFakeSession::default();
666
667        re_report_unacked(&tracker, &mut session).await?;
668
669        assert_eq!(tracker.len(), 1);
670        assert_eq!(
671            session.reports,
672            vec![RecordedReport::Completed(workflow_id, activity_id, output)]
673        );
674        Ok(())
675    }
676
677    #[derive(Default)]
678    struct ReconnectFakeSession {
679        handshakes: Vec<String>,
680        registrations: Vec<Vec<String>>,
681        reports: Vec<RecordedReport>,
682    }
683
684    /// Session whose registration is rejected by the server with a gRPC denial,
685    /// mirroring `aion-server` answering `PermissionDenied` for an ungranted
686    /// namespace.
687    struct DeniedRegistrationSession {
688        denial: tonic::Status,
689    }
690
691    #[async_trait]
692    impl WorkerSession for DeniedRegistrationSession {
693        async fn handshake(&mut self, _config: &WorkerConfig) -> Result<(), WorkerError> {
694            Ok(())
695        }
696
697        async fn register(
698            &mut self,
699            activity_types: Vec<String>,
700            available_handlers: &BTreeSet<String>,
701        ) -> Result<(), WorkerError> {
702            validate_activity_handlers(&activity_types, available_handlers)?;
703            Err(WorkerError::Registration {
704                source: Box::new(self.denial.clone()),
705            })
706        }
707
708        fn receive_tasks(&mut self) -> WorkerTaskStream {
709            Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
710        }
711
712        async fn report_result(
713            &mut self,
714            workflow_id: WorkflowId,
715            activity_id: ActivityId,
716            result: Payload,
717        ) -> Result<(), WorkerError> {
718            drop((workflow_id, activity_id, result));
719            Err(WorkerError::Registration {
720                source: Box::new(self.denial.clone()),
721            })
722        }
723
724        async fn report_failure(
725            &mut self,
726            workflow_id: WorkflowId,
727            activity_id: ActivityId,
728            failure: ActivityError,
729        ) -> Result<(), WorkerError> {
730            drop((workflow_id, activity_id, failure));
731            Err(WorkerError::Registration {
732                source: Box::new(self.denial.clone()),
733            })
734        }
735
736        async fn send_heartbeat(
737            &mut self,
738            workflow_id: WorkflowId,
739            activity_id: ActivityId,
740            progress: Option<Payload>,
741        ) -> Result<(), WorkerError> {
742            drop((workflow_id, activity_id, progress));
743            Err(WorkerError::Registration {
744                source: Box::new(self.denial.clone()),
745            })
746        }
747    }
748
749    #[derive(Clone, Debug, PartialEq, Eq)]
750    enum RecordedReport {
751        Completed(WorkflowId, ActivityId, Payload),
752        Failed(WorkflowId, ActivityId, ActivityError),
753    }
754
755    #[async_trait]
756    impl WorkerSession for ReconnectFakeSession {
757        async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
758            self.handshakes.push(config.identity.clone());
759            Ok(())
760        }
761
762        async fn register(
763            &mut self,
764            activity_types: Vec<String>,
765            available_handlers: &BTreeSet<String>,
766        ) -> Result<(), WorkerError> {
767            validate_activity_handlers(&activity_types, available_handlers)?;
768            self.registrations.push(activity_types);
769            Ok(())
770        }
771
772        fn receive_tasks(&mut self) -> WorkerTaskStream {
773            Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
774        }
775
776        async fn report_result(
777            &mut self,
778            workflow_id: WorkflowId,
779            activity_id: ActivityId,
780            result: Payload,
781        ) -> Result<(), WorkerError> {
782            self.reports
783                .push(RecordedReport::Completed(workflow_id, activity_id, result));
784            Ok(())
785        }
786
787        async fn report_failure(
788            &mut self,
789            workflow_id: WorkflowId,
790            activity_id: ActivityId,
791            failure: ActivityError,
792        ) -> Result<(), WorkerError> {
793            self.reports
794                .push(RecordedReport::Failed(workflow_id, activity_id, failure));
795            Ok(())
796        }
797
798        async fn send_heartbeat(
799            &mut self,
800            workflow_id: WorkflowId,
801            activity_id: ActivityId,
802            progress: Option<Payload>,
803        ) -> Result<(), WorkerError> {
804            drop((workflow_id, activity_id, progress));
805            Ok(())
806        }
807    }
808
809    fn test_config() -> WorkerConfig {
810        WorkerConfig::new(
811            "http://127.0.0.1:50051",
812            "payments",
813            "worker-a",
814            2,
815            ReconnectConfig::new(Duration::from_millis(5), Duration::from_millis(20), 3),
816            None,
817        )
818    }
819
820    fn terminal_failure() -> ActivityError {
821        ActivityError {
822            kind: ActivityErrorKind::Terminal,
823            message: String::from("terminal"),
824            details: None,
825        }
826    }
827
828    #[test]
829    fn tracker_replaces_existing_activity_report() {
830        let workflow_id = WorkflowId::new_v4();
831        let activity_id = ActivityId::from_sequence_position(9);
832        let mut tracker = UnackedResultTracker::new();
833        tracker.record(PendingActivityReport::Completed {
834            workflow_id: workflow_id.clone(),
835            activity_id: activity_id.clone(),
836            output: Payload::new(ContentType::Json, b"{}".to_vec()),
837        });
838        tracker.record(PendingActivityReport::Failed {
839            workflow_id: workflow_id.clone(),
840            activity_id: activity_id.clone(),
841            failure: terminal_failure(),
842        });
843
844        assert_eq!(tracker.len(), 1);
845        assert!(matches!(
846            tracker.get(&workflow_id, &activity_id),
847            Some(PendingActivityReport::Failed { .. })
848        ));
849    }
850}