Skip to main content

aion_worker/protocol/
reconnect.rs

1//! Backoff reconnect, re-register, and re-report un-acked results.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::future::Future;
5use std::time::Duration;
6
7use aion_core::{ActivityError, ActivityId, Payload, WorkflowId};
8use tracing::{debug, error, warn};
9use uuid::Uuid;
10
11use crate::config::WorkerConfig;
12use crate::error::WorkerError;
13use crate::protocol::{GrpcWorkerSession, WorkerSession};
14
15/// Result or failure computed locally and not yet acknowledged by the engine.
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum PendingActivityReport {
18    /// Successful activity output to re-report until acknowledged.
19    Completed {
20        /// Workflow owning the activity.
21        workflow_id: WorkflowId,
22        /// Activity identifier used by AW for idempotent ingest.
23        activity_id: ActivityId,
24        /// Opaque activity output payload.
25        output: Payload,
26    },
27    /// Explicitly classified activity failure to re-report until acknowledged.
28    Failed {
29        /// Workflow owning the activity.
30        workflow_id: WorkflowId,
31        /// Activity identifier used by AW for idempotent ingest.
32        activity_id: ActivityId,
33        /// Classified activity error.
34        failure: ActivityError,
35    },
36}
37
38impl PendingActivityReport {
39    /// Returns the report's activity id key.
40    #[must_use]
41    pub const fn activity_id(&self) -> &ActivityId {
42        match self {
43            Self::Completed { activity_id, .. } | Self::Failed { activity_id, .. } => activity_id,
44        }
45    }
46
47    /// Returns the workflow owning the report's activity.
48    #[must_use]
49    pub const fn workflow_id(&self) -> &WorkflowId {
50        match self {
51            Self::Completed { workflow_id, .. } | Self::Failed { workflow_id, .. } => workflow_id,
52        }
53    }
54}
55
56/// Deterministic tracker key: activity ids are sequence positions scoped to
57/// one workflow, so distinct workflows legitimately collide on the bare
58/// position and must be keyed by workflow as well.
59type PendingReportKey = (Uuid, u64);
60
61fn pending_report_key(workflow_id: &WorkflowId, activity_id: &ActivityId) -> PendingReportKey {
62    (workflow_id.as_uuid(), activity_id.sequence_position())
63}
64
65/// In-memory source of truth for locally reported results awaiting engine ack.
66#[derive(Clone, Debug, Default, PartialEq, Eq)]
67pub struct UnackedResultTracker {
68    reports: BTreeMap<PendingReportKey, PendingActivityReport>,
69}
70
71impl UnackedResultTracker {
72    /// Creates an empty tracker.
73    #[must_use]
74    pub const fn new() -> Self {
75        Self {
76            reports: BTreeMap::new(),
77        }
78    }
79
80    /// Records a report, replacing any earlier pending report for the same
81    /// workflow and activity id.
82    pub fn record(&mut self, report: PendingActivityReport) {
83        let key = pending_report_key(report.workflow_id(), report.activity_id());
84        self.reports.insert(key, report);
85    }
86
87    /// Drops a report once the engine explicitly acknowledges it.
88    pub fn acknowledge(
89        &mut self,
90        workflow_id: &WorkflowId,
91        activity_id: &ActivityId,
92    ) -> Option<PendingActivityReport> {
93        self.reports
94            .remove(&pending_report_key(workflow_id, activity_id))
95    }
96
97    /// Returns the number of unacknowledged reports.
98    #[must_use]
99    pub fn len(&self) -> usize {
100        self.reports.len()
101    }
102
103    /// Returns true when no reports are waiting for acknowledgement.
104    #[must_use]
105    pub fn is_empty(&self) -> bool {
106        self.reports.is_empty()
107    }
108
109    /// Gets a pending report by its workflow and activity id.
110    #[must_use]
111    pub fn get(
112        &self,
113        workflow_id: &WorkflowId,
114        activity_id: &ActivityId,
115    ) -> Option<&PendingActivityReport> {
116        self.reports
117            .get(&pending_report_key(workflow_id, activity_id))
118    }
119
120    /// Returns a deterministic snapshot for re-reporting without holding a borrow.
121    #[must_use]
122    pub fn snapshot(&self) -> Vec<PendingActivityReport> {
123        self.reports.values().cloned().collect()
124    }
125}
126
127/// Validated reconnect backoff settings drawn from [`WorkerConfig`].
128#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct ReconnectBackoff {
130    initial: Duration,
131    max: Duration,
132    attempts: usize,
133}
134
135impl ReconnectBackoff {
136    /// Builds reconnect backoff from worker config.
137    ///
138    /// # Errors
139    ///
140    /// Returns [`WorkerError::Registration`] if delays or attempt counts are zero.
141    pub fn from_config(config: &WorkerConfig) -> Result<Self, WorkerError> {
142        if config.reconnect.initial_backoff.is_zero() {
143            return Err(WorkerError::registration(InvalidReconnectBackoff {
144                message: String::from("reconnect initial_backoff must be greater than zero"),
145            }));
146        }
147        if config.reconnect.max_backoff.is_zero() {
148            return Err(WorkerError::registration(InvalidReconnectBackoff {
149                message: String::from("reconnect max_backoff must be greater than zero"),
150            }));
151        }
152        if config.reconnect.max_attempts == 0 {
153            return Err(WorkerError::registration(InvalidReconnectBackoff {
154                message: String::from("reconnect max_attempts must be greater than zero"),
155            }));
156        }
157        Ok(Self {
158            initial: config.reconnect.initial_backoff,
159            max: config.reconnect.max_backoff,
160            attempts: config.reconnect.max_attempts,
161        })
162    }
163
164    /// Returns the bounded exponential delay after `completed_failures` failures.
165    ///
166    /// The delay doubles per completed failure starting from the configured
167    /// initial backoff and is capped at the configured maximum backoff.
168    #[must_use]
169    pub fn delay_for_attempt(&self, completed_failures: usize) -> Duration {
170        let bounded_shift = completed_failures.saturating_sub(1).min(31);
171        let shift = u32::try_from(bounded_shift).map_or(31, |shift| shift);
172        let factor = 1_u32.checked_shl(shift).map_or(u32::MAX, |factor| factor);
173        self.initial.saturating_mul(factor).min(self.max)
174    }
175
176    /// Returns the configured maximum number of reconnect attempts.
177    #[must_use]
178    pub const fn attempts(&self) -> usize {
179        self.attempts
180    }
181
182    /// Returns the configured maximum backoff delay cap.
183    ///
184    /// The run loop also uses this as its session-health threshold: the cap
185    /// is the policy's own definition of the longest pause, so an
186    /// established session that survives longer than it is demonstrably past
187    /// the flapping regime and resets the cumulative drop budget when it
188    /// eventually drops.
189    #[must_use]
190    pub const fn max_delay(&self) -> Duration {
191        self.max
192    }
193}
194
195/// Connects, handshakes, and registers a fresh gRPC worker session.
196///
197/// # Errors
198///
199/// Returns [`WorkerError`] if connection, handshake, or registration fails.
200pub async fn connect_registered_grpc_session(
201    config: &WorkerConfig,
202    activity_types: Vec<String>,
203    available_handlers: &BTreeSet<String>,
204) -> Result<GrpcWorkerSession, WorkerError> {
205    let session = GrpcWorkerSession::connect(config.clone()).await?;
206    register_connected_session(session, config, activity_types, available_handlers).await
207}
208
209/// Handshakes and registers an already-connected session.
210///
211/// # Errors
212///
213/// Returns [`WorkerError`] if handshake or registration fails.
214pub async fn register_connected_session<S>(
215    mut session: S,
216    config: &WorkerConfig,
217    activity_types: Vec<String>,
218    available_handlers: &BTreeSet<String>,
219) -> Result<S, WorkerError>
220where
221    S: WorkerSession,
222{
223    session.handshake(config).await?;
224    session.register(activity_types, available_handlers).await?;
225    Ok(session)
226}
227
228/// Reconnects with bounded exponential backoff using an injected session factory.
229///
230/// # Errors
231///
232/// Returns the last [`WorkerError`] after configured attempts are exhausted, or
233/// immediately when the failure is a non-retryable `PermissionDenied` /
234/// `Unauthenticated` denial, or if the config contains invalid zero reconnect
235/// settings.
236pub async fn reconnect_with_backoff<S, F, Fut>(
237    config: &WorkerConfig,
238    activity_types: Vec<String>,
239    available_handlers: &BTreeSet<String>,
240    connect: F,
241) -> Result<S, WorkerError>
242where
243    S: WorkerSession,
244    F: FnMut() -> Fut,
245    Fut: Future<Output = Result<S, WorkerError>>,
246{
247    reconnect_with_sleep(
248        config,
249        activity_types,
250        available_handlers,
251        connect,
252        tokio::time::sleep,
253    )
254    .await
255}
256
257/// Testable reconnect helper with injectable sleep.
258///
259/// # Errors
260///
261/// Returns the last [`WorkerError`] after configured attempts are exhausted, or
262/// immediately — without consuming further attempts — when a failure is a
263/// non-retryable denial ([`WorkerError::is_retryable`] is false, i.e. the
264/// server answered `PermissionDenied` or `Unauthenticated`), or if the config
265/// contains invalid zero reconnect settings.
266pub async fn reconnect_with_sleep<S, F, Fut, Sleep, SleepFut>(
267    config: &WorkerConfig,
268    activity_types: Vec<String>,
269    available_handlers: &BTreeSet<String>,
270    mut connect: F,
271    mut sleep: Sleep,
272) -> Result<S, WorkerError>
273where
274    S: WorkerSession,
275    F: FnMut() -> Fut,
276    Fut: Future<Output = Result<S, WorkerError>>,
277    Sleep: FnMut(Duration) -> SleepFut,
278    SleepFut: Future<Output = ()>,
279{
280    let backoff = ReconnectBackoff::from_config(config)?;
281
282    for attempt in 1..=backoff.attempts() {
283        debug!(attempt, "attempting worker reconnect");
284        let result = match connect().await {
285            Ok(session) => {
286                register_connected_session(
287                    session,
288                    config,
289                    activity_types.clone(),
290                    available_handlers,
291                )
292                .await
293            }
294            Err(error) => Err(error),
295        };
296
297        match result {
298            Ok(session) => {
299                debug!(attempt, "worker reconnect succeeded");
300                return Ok(session);
301            }
302            Err(error) => {
303                if !error.is_retryable() {
304                    error!(
305                        attempt,
306                        error = %error,
307                        "worker reconnect denied by server; not retrying"
308                    );
309                    return Err(error);
310                }
311                if attempt == backoff.attempts() {
312                    error!(attempt, error = %error, "worker reconnect attempts exhausted");
313                    return Err(error);
314                }
315                let delay = backoff.delay_for_attempt(attempt);
316                warn!(
317                    attempt,
318                    delay_ms = delay.as_millis(),
319                    error = %error,
320                    "worker reconnect failed; backing off"
321                );
322                sleep(delay).await;
323            }
324        }
325    }
326
327    Err(WorkerError::registration(InvalidReconnectBackoff {
328        message: String::from("reconnect_max_attempts must be greater than zero"),
329    }))
330}
331
332/// Re-reports every unacknowledged result/failure before serving new work.
333///
334/// Server `ResultAck` frames clear entries mid-session, so the steady-state
335/// backlog is empty and this replay decays to the still-unacked residue.
336/// Each send carries the session's per-send deadline.
337///
338/// # Errors
339///
340/// Returns [`WorkerError`] if any re-report send fails. Entries are not removed
341/// by sending; only the explicit `ResultAck` acknowledgement clears the tracker.
342pub async fn re_report_unacked<S>(
343    tracker: &UnackedResultTracker,
344    session: &mut S,
345) -> Result<(), WorkerError>
346where
347    S: WorkerSession,
348{
349    for report in tracker.snapshot() {
350        match report {
351            PendingActivityReport::Completed {
352                workflow_id,
353                activity_id,
354                output,
355            } => {
356                debug!(
357                    workflow_id = %workflow_id,
358                    activity_id = activity_id.sequence_position(),
359                    "re-reporting unacknowledged activity result"
360                );
361                session
362                    .report_result(workflow_id, activity_id, output)
363                    .await?;
364            }
365            PendingActivityReport::Failed {
366                workflow_id,
367                activity_id,
368                failure,
369            } => {
370                debug!(
371                    workflow_id = %workflow_id,
372                    activity_id = activity_id.sequence_position(),
373                    "re-reporting unacknowledged activity failure"
374                );
375                session
376                    .report_failure(workflow_id, activity_id, failure)
377                    .await?;
378            }
379        }
380    }
381    Ok(())
382}
383
384#[derive(Debug, thiserror::Error)]
385#[error("{message}")]
386struct InvalidReconnectBackoff {
387    message: String,
388}
389
390#[cfg(test)]
391mod tests {
392    use std::cell::RefCell;
393    use std::collections::BTreeSet;
394    use std::rc::Rc;
395    use std::time::Duration;
396
397    use aion_core::{
398        ActivityError, ActivityErrorKind, ActivityId, ContentType, Payload, WorkflowId,
399    };
400    use async_trait::async_trait;
401    use futures::stream;
402
403    use super::{
404        PendingActivityReport, UnackedResultTracker, re_report_unacked, reconnect_with_sleep,
405    };
406    use crate::error::WorkerError;
407    use crate::protocol::{
408        WorkerSession, WorkerSessionEvent, WorkerTaskStream, validate_activity_handlers,
409    };
410    use crate::{ReconnectConfig, WorkerConfig};
411
412    #[test]
413    fn tracker_records_reports_and_acknowledges_by_workflow_and_activity_id() {
414        let workflow_id = WorkflowId::new_v4();
415        let first_id = ActivityId::from_sequence_position(1);
416        let second_id = ActivityId::from_sequence_position(2);
417        let mut tracker = UnackedResultTracker::new();
418
419        tracker.record(PendingActivityReport::Completed {
420            workflow_id: workflow_id.clone(),
421            activity_id: first_id.clone(),
422            output: Payload::new(ContentType::Json, b"{\"first\":true}".to_vec()),
423        });
424        tracker.record(PendingActivityReport::Completed {
425            workflow_id: workflow_id.clone(),
426            activity_id: second_id.clone(),
427            output: Payload::new(ContentType::Json, b"{\"second\":true}".to_vec()),
428        });
429
430        assert_eq!(tracker.len(), 2);
431        assert!(tracker.acknowledge(&workflow_id, &first_id).is_some());
432        assert_eq!(tracker.len(), 1);
433        assert!(tracker.get(&workflow_id, &second_id).is_some());
434        assert!(tracker.get(&workflow_id, &first_id).is_none());
435    }
436
437    #[test]
438    fn tracker_keeps_reports_for_distinct_workflows_at_the_same_sequence_position() {
439        let first_workflow = WorkflowId::new_v4();
440        let second_workflow = WorkflowId::new_v4();
441        let activity_id = ActivityId::from_sequence_position(3);
442        let mut tracker = UnackedResultTracker::new();
443
444        tracker.record(PendingActivityReport::Completed {
445            workflow_id: first_workflow.clone(),
446            activity_id: activity_id.clone(),
447            output: Payload::new(ContentType::Json, b"{\"workflow\":\"a\"}".to_vec()),
448        });
449        tracker.record(PendingActivityReport::Completed {
450            workflow_id: second_workflow.clone(),
451            activity_id: activity_id.clone(),
452            output: Payload::new(ContentType::Json, b"{\"workflow\":\"b\"}".to_vec()),
453        });
454
455        assert_eq!(tracker.len(), 2);
456        assert!(tracker.get(&first_workflow, &activity_id).is_some());
457        assert!(tracker.get(&second_workflow, &activity_id).is_some());
458        assert!(
459            tracker.acknowledge(&first_workflow, &activity_id).is_some(),
460            "acknowledging one workflow's report must not require the other's"
461        );
462        assert_eq!(tracker.len(), 1);
463        assert!(tracker.get(&second_workflow, &activity_id).is_some());
464    }
465
466    #[tokio::test]
467    async fn reconnect_fails_once_then_handshakes_and_registers() -> Result<(), WorkerError> {
468        let config = test_config();
469        let attempts = Rc::new(RefCell::new(0usize));
470        let sleeps = Rc::new(RefCell::new(Vec::new()));
471        let activity_types = vec![String::from("charge-card")];
472        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
473        let attempts_for_connect = Rc::clone(&attempts);
474        let sleeps_for_sleep = Rc::clone(&sleeps);
475
476        let session = reconnect_with_sleep(
477            &config,
478            activity_types.clone(),
479            &handlers,
480            move || {
481                let attempts_for_connect = Rc::clone(&attempts_for_connect);
482                async move {
483                    let mut attempts = attempts_for_connect.borrow_mut();
484                    *attempts += 1;
485                    if *attempts == 1 {
486                        Err(WorkerError::Transport {
487                            source: tonic::Status::unavailable("disconnected"),
488                        })
489                    } else {
490                        Ok(ReconnectFakeSession::default())
491                    }
492                }
493            },
494            move |delay| {
495                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
496                async move {
497                    sleeps_for_sleep.borrow_mut().push(delay);
498                }
499            },
500        )
501        .await?;
502
503        assert_eq!(*attempts.borrow(), 2);
504        assert_eq!(*sleeps.borrow(), vec![Duration::from_millis(5)]);
505        assert_eq!(session.handshakes, vec![String::from("worker-a")]);
506        assert_eq!(session.registrations, vec![activity_types]);
507        Ok(())
508    }
509
510    #[tokio::test]
511    async fn permission_denied_registration_stops_after_one_attempt() {
512        let config = test_config();
513        let attempts = Rc::new(RefCell::new(0usize));
514        let sleeps = Rc::new(RefCell::new(Vec::new()));
515        let activity_types = vec![String::from("charge-card")];
516        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
517        let attempts_for_connect = Rc::clone(&attempts);
518        let sleeps_for_sleep = Rc::clone(&sleeps);
519
520        let result = reconnect_with_sleep(
521            &config,
522            activity_types,
523            &handlers,
524            move || {
525                let attempts_for_connect = Rc::clone(&attempts_for_connect);
526                async move {
527                    *attempts_for_connect.borrow_mut() += 1;
528                    Ok(DeniedRegistrationSession {
529                        denial: tonic::Status::permission_denied(
530                            "namespace `payments` is not granted to subject `worker-a`",
531                        ),
532                    })
533                }
534            },
535            move |delay| {
536                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
537                async move {
538                    sleeps_for_sleep.borrow_mut().push(delay);
539                }
540            },
541        )
542        .await;
543
544        assert!(result.is_err());
545        let Err(error) = result else { return };
546        assert_eq!(*attempts.borrow(), 1);
547        assert!(sleeps.borrow().is_empty());
548        assert!(!error.is_retryable());
549        assert!(matches!(
550            error.grpc_status().map(tonic::Status::code),
551            Some(tonic::Code::PermissionDenied)
552        ));
553        assert_eq!(
554            error.grpc_status().map(tonic::Status::message),
555            Some("namespace `payments` is not granted to subject `worker-a`")
556        );
557        assert!(
558            error
559                .to_string()
560                .contains("namespace `payments` is not granted to subject `worker-a`")
561        );
562    }
563
564    #[tokio::test]
565    async fn unauthenticated_handshake_stops_after_one_attempt() {
566        let config = test_config();
567        let attempts = Rc::new(RefCell::new(0usize));
568        let sleeps = Rc::new(RefCell::new(Vec::new()));
569        let activity_types = vec![String::from("charge-card")];
570        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
571        let attempts_for_connect = Rc::clone(&attempts);
572        let sleeps_for_sleep = Rc::clone(&sleeps);
573
574        let result = reconnect_with_sleep(
575            &config,
576            activity_types,
577            &handlers,
578            move || {
579                let attempts_for_connect = Rc::clone(&attempts_for_connect);
580                async move {
581                    *attempts_for_connect.borrow_mut() += 1;
582                    Err::<ReconnectFakeSession, _>(WorkerError::Handshake {
583                        source: tonic::Status::unauthenticated("worker credentials were rejected"),
584                    })
585                }
586            },
587            move |delay| {
588                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
589                async move {
590                    sleeps_for_sleep.borrow_mut().push(delay);
591                }
592            },
593        )
594        .await;
595
596        assert!(result.is_err());
597        let Err(error) = result else { return };
598        assert_eq!(*attempts.borrow(), 1);
599        assert!(sleeps.borrow().is_empty());
600        assert!(!error.is_retryable());
601        assert!(matches!(
602            error.grpc_status().map(tonic::Status::code),
603            Some(tonic::Code::Unauthenticated)
604        ));
605        assert!(
606            error
607                .to_string()
608                .contains("worker credentials were rejected")
609        );
610    }
611
612    #[tokio::test]
613    async fn unavailable_transport_retries_until_attempts_exhausted() {
614        let config = test_config();
615        let attempts = Rc::new(RefCell::new(0usize));
616        let sleeps = Rc::new(RefCell::new(Vec::new()));
617        let activity_types = vec![String::from("charge-card")];
618        let handlers = activity_types.iter().cloned().collect::<BTreeSet<_>>();
619        let attempts_for_connect = Rc::clone(&attempts);
620        let sleeps_for_sleep = Rc::clone(&sleeps);
621
622        let result = reconnect_with_sleep(
623            &config,
624            activity_types,
625            &handlers,
626            move || {
627                let attempts_for_connect = Rc::clone(&attempts_for_connect);
628                async move {
629                    *attempts_for_connect.borrow_mut() += 1;
630                    Err::<ReconnectFakeSession, _>(WorkerError::Transport {
631                        source: tonic::Status::unavailable("engine unreachable"),
632                    })
633                }
634            },
635            move |delay| {
636                let sleeps_for_sleep = Rc::clone(&sleeps_for_sleep);
637                async move {
638                    sleeps_for_sleep.borrow_mut().push(delay);
639                }
640            },
641        )
642        .await;
643
644        assert!(result.is_err());
645        let Err(error) = result else { return };
646        assert_eq!(*attempts.borrow(), 3);
647        assert_eq!(
648            *sleeps.borrow(),
649            vec![Duration::from_millis(5), Duration::from_millis(10)]
650        );
651        assert!(error.is_retryable());
652        assert!(matches!(
653            error.grpc_status().map(tonic::Status::code),
654            Some(tonic::Code::Unavailable)
655        ));
656    }
657
658    #[tokio::test]
659    async fn re_reports_unacked_reports_without_removing_them() -> Result<(), WorkerError> {
660        let workflow_id = WorkflowId::new_v4();
661        let activity_id = ActivityId::from_sequence_position(7);
662        let output = Payload::new(ContentType::Json, b"{}".to_vec());
663        let mut tracker = UnackedResultTracker::new();
664        tracker.record(PendingActivityReport::Completed {
665            workflow_id: workflow_id.clone(),
666            activity_id: activity_id.clone(),
667            output: output.clone(),
668        });
669        let mut session = ReconnectFakeSession::default();
670
671        re_report_unacked(&tracker, &mut session).await?;
672
673        assert_eq!(tracker.len(), 1);
674        assert_eq!(
675            session.reports,
676            vec![RecordedReport::Completed(workflow_id, activity_id, output)]
677        );
678        Ok(())
679    }
680
681    #[derive(Default)]
682    struct ReconnectFakeSession {
683        handshakes: Vec<String>,
684        registrations: Vec<Vec<String>>,
685        reports: Vec<RecordedReport>,
686    }
687
688    /// Session whose registration is rejected by the server with a gRPC denial,
689    /// mirroring `aion-server` answering `PermissionDenied` for an ungranted
690    /// namespace.
691    struct DeniedRegistrationSession {
692        denial: tonic::Status,
693    }
694
695    #[async_trait]
696    impl WorkerSession for DeniedRegistrationSession {
697        async fn handshake(&mut self, _config: &WorkerConfig) -> Result<(), WorkerError> {
698            Ok(())
699        }
700
701        async fn register(
702            &mut self,
703            activity_types: Vec<String>,
704            available_handlers: &BTreeSet<String>,
705        ) -> Result<(), WorkerError> {
706            validate_activity_handlers(&activity_types, available_handlers)?;
707            Err(WorkerError::Registration {
708                source: Box::new(self.denial.clone()),
709            })
710        }
711
712        fn receive_tasks(&mut self) -> WorkerTaskStream {
713            Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
714        }
715
716        async fn report_result(
717            &mut self,
718            workflow_id: WorkflowId,
719            activity_id: ActivityId,
720            result: Payload,
721        ) -> Result<(), WorkerError> {
722            drop((workflow_id, activity_id, result));
723            Err(WorkerError::Registration {
724                source: Box::new(self.denial.clone()),
725            })
726        }
727
728        async fn report_failure(
729            &mut self,
730            workflow_id: WorkflowId,
731            activity_id: ActivityId,
732            failure: ActivityError,
733        ) -> Result<(), WorkerError> {
734            drop((workflow_id, activity_id, failure));
735            Err(WorkerError::Registration {
736                source: Box::new(self.denial.clone()),
737            })
738        }
739
740        async fn send_heartbeat(
741            &mut self,
742            workflow_id: WorkflowId,
743            activity_id: ActivityId,
744            progress: Option<Payload>,
745        ) -> Result<(), WorkerError> {
746            drop((workflow_id, activity_id, progress));
747            Err(WorkerError::Registration {
748                source: Box::new(self.denial.clone()),
749            })
750        }
751    }
752
753    #[derive(Clone, Debug, PartialEq, Eq)]
754    enum RecordedReport {
755        Completed(WorkflowId, ActivityId, Payload),
756        Failed(WorkflowId, ActivityId, ActivityError),
757    }
758
759    #[async_trait]
760    impl WorkerSession for ReconnectFakeSession {
761        async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
762            self.handshakes.push(config.identity.clone());
763            Ok(())
764        }
765
766        async fn register(
767            &mut self,
768            activity_types: Vec<String>,
769            available_handlers: &BTreeSet<String>,
770        ) -> Result<(), WorkerError> {
771            validate_activity_handlers(&activity_types, available_handlers)?;
772            self.registrations.push(activity_types);
773            Ok(())
774        }
775
776        fn receive_tasks(&mut self) -> WorkerTaskStream {
777            Box::pin(stream::empty::<Result<WorkerSessionEvent, WorkerError>>())
778        }
779
780        async fn report_result(
781            &mut self,
782            workflow_id: WorkflowId,
783            activity_id: ActivityId,
784            result: Payload,
785        ) -> Result<(), WorkerError> {
786            self.reports
787                .push(RecordedReport::Completed(workflow_id, activity_id, result));
788            Ok(())
789        }
790
791        async fn report_failure(
792            &mut self,
793            workflow_id: WorkflowId,
794            activity_id: ActivityId,
795            failure: ActivityError,
796        ) -> Result<(), WorkerError> {
797            self.reports
798                .push(RecordedReport::Failed(workflow_id, activity_id, failure));
799            Ok(())
800        }
801
802        async fn send_heartbeat(
803            &mut self,
804            workflow_id: WorkflowId,
805            activity_id: ActivityId,
806            progress: Option<Payload>,
807        ) -> Result<(), WorkerError> {
808            drop((workflow_id, activity_id, progress));
809            Ok(())
810        }
811    }
812
813    fn test_config() -> WorkerConfig {
814        WorkerConfig::new(
815            "http://127.0.0.1:50051",
816            "payments",
817            "worker-a",
818            2,
819            ReconnectConfig::new(Duration::from_millis(5), Duration::from_millis(20), 3),
820            None,
821        )
822    }
823
824    fn terminal_failure() -> ActivityError {
825        ActivityError {
826            kind: ActivityErrorKind::Terminal,
827            message: String::from("terminal"),
828            details: None,
829        }
830    }
831
832    #[test]
833    fn tracker_replaces_existing_activity_report() {
834        let workflow_id = WorkflowId::new_v4();
835        let activity_id = ActivityId::from_sequence_position(9);
836        let mut tracker = UnackedResultTracker::new();
837        tracker.record(PendingActivityReport::Completed {
838            workflow_id: workflow_id.clone(),
839            activity_id: activity_id.clone(),
840            output: Payload::new(ContentType::Json, b"{}".to_vec()),
841        });
842        tracker.record(PendingActivityReport::Failed {
843            workflow_id: workflow_id.clone(),
844            activity_id: activity_id.clone(),
845            failure: terminal_failure(),
846        });
847
848        assert_eq!(tracker.len(), 1);
849        assert!(matches!(
850            tracker.get(&workflow_id, &activity_id),
851            Some(PendingActivityReport::Failed { .. })
852        ));
853    }
854}