Skip to main content

aion_worker/
worker.rs

1//! `Worker` builder, run loop, and shutdown wiring.
2
3use std::collections::BTreeSet;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Poll;
8
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use tracing::{error, info, warn};
12
13use crate::activity::{ActivityRegistry, HandlerFuture};
14use crate::config::WorkerConfig;
15use crate::context::ActivityContext;
16use crate::error::WorkerError;
17use crate::protocol::reconnect::{
18    ReconnectBackoff, UnackedResultTracker, re_report_unacked, reconnect_with_backoff,
19    register_connected_session,
20};
21use crate::protocol::{GrpcWorkerSession, WorkerSession};
22use crate::runtime::{
23    NoShutdown, ServeEnd, SessionHealth, serve_activity_tasks, serve_activity_tasks_until,
24};
25
26/// Builder for a configured worker and its registered typed activities.
27#[must_use]
28pub struct WorkerBuilder {
29    config: WorkerConfig,
30    activities: ActivityRegistry,
31}
32
33impl WorkerBuilder {
34    /// Creates a builder for a worker using the supplied config.
35    pub fn new(config: WorkerConfig) -> Self {
36        Self {
37            config,
38            activities: ActivityRegistry::new(),
39        }
40    }
41
42    /// Registers one typed activity handler on the builder.
43    ///
44    /// # Errors
45    ///
46    /// Returns [`WorkerError::Registration`] when the activity type is duplicate.
47    pub fn register_activity<Input, Output, Handler>(
48        mut self,
49        activity_type: impl Into<String>,
50        handler: Handler,
51    ) -> Result<Self, WorkerError>
52    where
53        Input: Serialize + DeserializeOwned + Send + Sync + 'static,
54        Output: Serialize + Send + Sync + 'static,
55        Handler: for<'context> Fn(Input, &'context ActivityContext) -> HandlerFuture<'context, Output>
56            + Send
57            + Sync
58            + 'static,
59    {
60        self.activities = self.activities.register_activity(activity_type, handler)?;
61        Ok(self)
62    }
63
64    /// Builds the worker after validating that it has at least one activity.
65    ///
66    /// # Errors
67    ///
68    /// Returns [`WorkerError::Registration`] when no activities are registered.
69    pub fn build(self) -> Result<Worker, WorkerError> {
70        if self.activities.is_empty() {
71            return Err(WorkerError::registration(EmptyActivitySet));
72        }
73        let available_handlers = self.activities.activity_types();
74        let activity_types = available_handlers.iter().cloned().collect();
75        Ok(Worker {
76            config: self.config,
77            activity_types,
78            available_handlers,
79            activities: Arc::new(self.activities),
80        })
81    }
82}
83
84/// Configured Rust worker with typed activity handlers.
85#[must_use]
86pub struct Worker {
87    config: WorkerConfig,
88    activity_types: Vec<String>,
89    available_handlers: BTreeSet<String>,
90    activities: Arc<ActivityRegistry>,
91}
92
93impl Worker {
94    /// Starts a new builder for the supplied config.
95    pub fn builder(config: WorkerConfig) -> WorkerBuilder {
96        WorkerBuilder::new(config)
97    }
98
99    /// Returns the activity types this worker registers with the engine.
100    #[must_use]
101    pub fn activity_types(&self) -> &[String] {
102        &self.activity_types
103    }
104
105    /// Returns the handler-name set used for registration validation.
106    #[must_use]
107    pub fn available_handlers(&self) -> &BTreeSet<String> {
108        &self.available_handlers
109    }
110
111    /// Connects to the configured endpoint, registers activities, and serves indefinitely.
112    ///
113    /// Session establishment goes through the bounded-backoff reconnect
114    /// machinery configured in [`WorkerConfig::reconnect`], and retryable
115    /// mid-run transport drops — including clean server-side stream closes —
116    /// re-establish through the same machinery: the worker re-registers its
117    /// activity types, re-reports every unacknowledged activity result (the
118    /// engine ingests reports idempotently by `ActivityId`), and resumes
119    /// serving. Deterministic `PermissionDenied` / `Unauthenticated` denials
120    /// surface after exactly one attempt. Without a shutdown signal the run
121    /// ends only on a non-retryable error or drop-budget exhaustion; see
122    /// [`crate::config::ReconnectConfig`] for the budget-reset semantics.
123    ///
124    /// # Errors
125    ///
126    /// Returns [`WorkerError`] for connection, registration, dispatch, heartbeat, or report failures.
127    pub async fn run(self) -> Result<(), WorkerError> {
128        self.run_until(std::future::pending::<()>()).await
129    }
130
131    /// Connects to the configured endpoint, registers activities, and serves until shutdown fires.
132    ///
133    /// Establishment and mid-run reconnect behaviour match [`Worker::run`].
134    /// On shutdown, no new tasks are pulled, in-flight activity contexts are
135    /// marked cancelled, and all in-flight activities are drained before this
136    /// returns; shutdown signalled during a reconnect or backoff wins
137    /// promptly without waiting out the backoff delay.
138    ///
139    /// # Errors
140    ///
141    /// Returns [`WorkerError`] for connection, registration, dispatch, heartbeat, or report failures.
142    pub async fn run_until<Shutdown>(self, shutdown: Shutdown) -> Result<(), WorkerError>
143    where
144        Shutdown: Future<Output = ()> + Send,
145    {
146        let config = self.config.clone();
147        self.run_with_connector_until(move || GrpcWorkerSession::connect(config.clone()), shutdown)
148            .await
149    }
150
151    /// Runs the reconnect-aware serve loop over an injected session factory.
152    ///
153    /// Session establishment goes through
154    /// [`reconnect_with_backoff`](crate::protocol::reconnect::reconnect_with_backoff):
155    /// transient failures retry up to the configured `reconnect.max_attempts`
156    /// with bounded exponential backoff, while `PermissionDenied` /
157    /// `Unauthenticated` denials surface after exactly one attempt. When an
158    /// established session drops retryably mid-run — a retryable transport
159    /// failure or a clean server-side stream close, both count — the worker
160    /// drains in-flight activities into the unacked tracker, backs off,
161    /// reconnects through the same machinery (re-registering its activity
162    /// types), re-reports every unacknowledged result, and resumes serving.
163    ///
164    /// Mid-run drops share one cumulative budget of `reconnect.max_attempts`,
165    /// matching the Python and TypeScript workers, and the budget resets to
166    /// zero once a session proves healthy: it served at least one task, or it
167    /// stayed connected longer than `reconnect.max_backoff` (measured
168    /// monotonically from successful registration to the moment the stream
169    /// ended or dropped — post-drop draining of in-flight activities never
170    /// extends it). See
171    /// [`crate::config::ReconnectConfig`]. The run therefore ends only on
172    /// shutdown, a non-retryable error, or budget exhaustion — never merely
173    /// because the server closed the stream. At most one session is alive at
174    /// a time, and a shutdown signalled during a reconnect or backoff wins
175    /// promptly (returning `Ok` when the pending drop was a clean close).
176    ///
177    /// # Errors
178    ///
179    /// Returns [`WorkerError`] when establishment attempts are exhausted or
180    /// denied, when a non-retryable error occurs mid-run, when the mid-run
181    /// drop budget is exhausted ([`WorkerError::CleanCloseExhausted`] when
182    /// the exhausting drops were clean closes), or when shutdown interrupts
183    /// an unrecovered error drop.
184    pub async fn run_with_connector_until<S, F, Fut, Shutdown>(
185        self,
186        mut connect: F,
187        shutdown: Shutdown,
188    ) -> Result<(), WorkerError>
189    where
190        S: WorkerSession,
191        F: FnMut() -> Fut,
192        Fut: Future<Output = Result<S, WorkerError>>,
193        Shutdown: Future<Output = ()> + Send,
194    {
195        let backoff = ReconnectBackoff::from_config(&self.config)?;
196        let mut tracker = UnackedResultTracker::new();
197        tokio::pin!(shutdown);
198        let mut shutdown = SharedShutdown::new(shutdown);
199        let mut drop_failures = 0_usize;
200        let mut recovery_error: Option<WorkerError> = None;
201
202        loop {
203            let connected = tokio::select! {
204                biased;
205                () = shutdown.wait() => {
206                    return recovery_error.take().map_or(Ok(()), Err);
207                }
208                result = reconnect_with_backoff(
209                    &self.config,
210                    self.activity_types.clone(),
211                    &self.available_handlers,
212                    &mut connect,
213                ) => result,
214            };
215            let mut session = connected?;
216            let session_started = tokio::time::Instant::now();
217            let mut health = SessionHealth::default();
218            let served = match re_report_unacked(&tracker, &mut session).await {
219                Ok(()) => {
220                    serve_activity_tasks_until(
221                        &self.config,
222                        &mut session,
223                        Arc::clone(&self.activities),
224                        &mut tracker,
225                        &mut health,
226                        shutdown.wait(),
227                    )
228                    .await
229                }
230                Err(report_error) => Err(report_error),
231            };
232            drop(session);
233            let cause = match served {
234                Ok(ServeEnd::Shutdown) => return Ok(()),
235                Ok(ServeEnd::StreamClosed) => {
236                    if shutdown.fired() {
237                        return Ok(());
238                    }
239                    DropCause::CleanClose
240                }
241                Err(error) if !error.is_retryable() => {
242                    error!(error = %error, "worker session denied by server; not reconnecting");
243                    return Err(error);
244                }
245                Err(error) => {
246                    if shutdown.fired() {
247                        return Err(error);
248                    }
249                    DropCause::Failure(error)
250                }
251            };
252            // Connected lifetime is measured from successful registration to
253            // the moment the stream ended — never to the end of the post-drop
254            // drain, which would let a long-running in-flight handler
255            // masquerade as a healthy session and reset the budget forever.
256            // A replay failure never enters the serve loop, so its drop
257            // moment is now.
258            let connected_for = health
259                .stream_ended_at
260                .unwrap_or_else(tokio::time::Instant::now)
261                .saturating_duration_since(session_started);
262            let proved_healthy = health.tasks_reported > 0 || connected_for > backoff.max_delay();
263            if proved_healthy && drop_failures > 0 {
264                info!(
265                    drop_failures,
266                    tasks_reported = health.tasks_reported,
267                    "worker session proved healthy; drop budget reset"
268                );
269                drop_failures = 0;
270            }
271            drop_failures += 1;
272            if drop_failures >= backoff.attempts() {
273                let error = cause.into_exhaustion_error();
274                error!(
275                    drop_failures,
276                    error = %error,
277                    "worker session drop budget exhausted; not reconnecting"
278                );
279                return Err(error);
280            }
281            let delay = backoff.delay_for_attempt(drop_failures);
282            warn!(
283                drop_failures,
284                delay_ms = delay.as_millis(),
285                cause = %cause,
286                "worker session dropped; reconnecting after backoff"
287            );
288            let shutdown_won = tokio::select! {
289                biased;
290                () = shutdown.wait() => true,
291                () = tokio::time::sleep(delay) => false,
292            };
293            if shutdown_won {
294                return cause.into_shutdown_result();
295            }
296            recovery_error = cause.into_recovery_error();
297        }
298    }
299
300    /// Test seam that handshakes, registers, and serves an injected session until its stream ends.
301    ///
302    /// # Errors
303    ///
304    /// Returns [`WorkerError`] for registration, dispatch, heartbeat, or report failures.
305    pub async fn run_with_session<S>(self, session: S) -> Result<S, WorkerError>
306    where
307        S: WorkerSession,
308    {
309        self.run_with_session_until(session, std::future::pending::<()>())
310            .await
311    }
312
313    /// Test seam that handshakes, registers, and serves an injected session until shutdown fires.
314    ///
315    /// # Errors
316    ///
317    /// Returns [`WorkerError`] for registration, dispatch, heartbeat, or report failures.
318    pub async fn run_with_session_until<S, Shutdown>(
319        self,
320        session: S,
321        shutdown: Shutdown,
322    ) -> Result<S, WorkerError>
323    where
324        S: WorkerSession,
325        Shutdown: Future<Output = ()> + Send,
326    {
327        let mut session = register_connected_session(
328            session,
329            &self.config,
330            self.activity_types.clone(),
331            &self.available_handlers,
332        )
333        .await?;
334        let mut tracker = UnackedResultTracker::new();
335        let mut health = SessionHealth::default();
336        serve_activity_tasks_until(
337            &self.config,
338            &mut session,
339            self.activities,
340            &mut tracker,
341            &mut health,
342            shutdown,
343        )
344        .await?;
345        Ok(session)
346    }
347}
348
349/// Cause of a retryable mid-run session drop carried across one recovery cycle.
350enum DropCause {
351    /// The session ended with a retryable error.
352    Failure(WorkerError),
353    /// The server closed the stream cleanly while the run was still active.
354    CleanClose,
355}
356
357impl DropCause {
358    /// The classified error surfaced when this drop exhausts the budget.
359    fn into_exhaustion_error(self) -> WorkerError {
360        match self {
361            Self::Failure(error) => error,
362            Self::CleanClose => WorkerError::CleanCloseExhausted,
363        }
364    }
365
366    /// Run outcome when shutdown wins the post-drop backoff: an error drop
367    /// surfaces its error, a clean close is a graceful end.
368    fn into_shutdown_result(self) -> Result<(), WorkerError> {
369        match self {
370            Self::Failure(error) => Err(error),
371            Self::CleanClose => Ok(()),
372        }
373    }
374
375    /// Error to surface if shutdown wins the recovery establishment select.
376    fn into_recovery_error(self) -> Option<WorkerError> {
377        match self {
378            Self::Failure(error) => Some(error),
379            Self::CleanClose => None,
380        }
381    }
382}
383
384impl std::fmt::Display for DropCause {
385    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            Self::Failure(error) => write!(formatter, "{error}"),
388            Self::CleanClose => write!(formatter, "server closed the worker stream cleanly"),
389        }
390    }
391}
392
393/// Level-triggered, re-pollable view over the caller's one-shot shutdown future.
394///
395/// The run loop observes the same shutdown signal from several places —
396/// session establishment, the serving loop, and reconnect backoff sleeps —
397/// but a `Future` must not be polled again once it has completed. This
398/// wrapper polls the underlying future at most to completion and then
399/// latches, so every subsequent [`SharedShutdown::wait`] resolves
400/// immediately.
401struct SharedShutdown<'a, S> {
402    inner: Pin<&'a mut S>,
403    fired: bool,
404}
405
406impl<'a, S> SharedShutdown<'a, S>
407where
408    S: Future<Output = ()> + Send,
409{
410    const fn new(inner: Pin<&'a mut S>) -> Self {
411        Self {
412            inner,
413            fired: false,
414        }
415    }
416
417    /// Returns whether the shutdown future has already completed.
418    const fn fired(&self) -> bool {
419        self.fired
420    }
421
422    /// Waits for shutdown; resolves immediately once it has fired before.
423    fn wait(&mut self) -> impl Future<Output = ()> + Send {
424        std::future::poll_fn(|context| {
425            if self.fired {
426                return Poll::Ready(());
427            }
428            match self.inner.as_mut().poll(context) {
429                Poll::Ready(()) => {
430                    self.fired = true;
431                    Poll::Ready(())
432                }
433                Poll::Pending => Poll::Pending,
434            }
435        })
436    }
437}
438
439/// Connects and serves an already-built worker with the default non-shutdown future.
440///
441/// # Errors
442///
443/// Returns [`WorkerError`] for registration, dispatch, heartbeat, or report failures.
444pub async fn run_worker_with_session<S>(worker: Worker, session: S) -> Result<S, WorkerError>
445where
446    S: WorkerSession,
447{
448    worker.run_with_session(session).await
449}
450
451/// Error returned when a worker is built without any registered activities.
452#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
453#[error("worker must register at least one activity handler")]
454pub struct EmptyActivitySet;
455
456fn _assert_live_session_type() {
457    let _ = std::mem::size_of::<GrpcWorkerSession>();
458    let _ = std::mem::size_of::<NoShutdown>();
459    let _ = serve_activity_tasks::<GrpcWorkerSession, ActivityRegistry>;
460}
461
462#[cfg(test)]
463mod tests {
464    use std::collections::BTreeSet;
465    use std::sync::Arc;
466    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
467    use std::time::Duration;
468
469    use aion_core::{ActivityError, ActivityId, ContentType, Payload, WorkflowId};
470    use aion_proto::{ProtoActivityId, ProtoActivityTask, ProtoPayload, ProtoWorkflowId};
471    use async_trait::async_trait;
472    use futures::StreamExt as _;
473    use futures::stream;
474    use serde::{Deserialize, Serialize};
475    use tokio::sync::{Notify, mpsc};
476
477    use super::{Worker, WorkerBuilder};
478    use crate::config::{ReconnectConfig, WorkerConfig};
479    use crate::context::ActivityContext;
480    use crate::error::WorkerError;
481    use crate::protocol::{
482        WorkerSession, WorkerSessionEvent, WorkerTaskStream, validate_activity_handlers,
483    };
484
485    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
486    struct TestInput {
487        value: i32,
488    }
489
490    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
491    struct TestOutput {
492        value: i32,
493    }
494
495    struct ChannelSession {
496        receiver: Option<mpsc::Receiver<Result<WorkerSessionEvent, WorkerError>>>,
497        reports: Vec<RecordedReport>,
498        registered: Vec<String>,
499    }
500
501    #[derive(Clone, Debug, PartialEq, Eq)]
502    enum RecordedReport {
503        Completed(WorkflowId, ActivityId, Payload),
504        Failed(WorkflowId, ActivityId, ActivityError),
505    }
506
507    #[async_trait]
508    impl WorkerSession for ChannelSession {
509        async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
510            drop(config.clone());
511            Ok(())
512        }
513
514        async fn register(
515            &mut self,
516            activity_types: Vec<String>,
517            available_handlers: &BTreeSet<String>,
518        ) -> Result<(), WorkerError> {
519            validate_activity_handlers(&activity_types, available_handlers)?;
520            self.registered = activity_types;
521            Ok(())
522        }
523
524        fn receive_tasks(&mut self) -> WorkerTaskStream {
525            match self.receiver.take() {
526                Some(receiver) => Box::pin(tokio_stream::wrappers::ReceiverStream::new(receiver)),
527                None => Box::pin(stream::empty()),
528            }
529        }
530
531        async fn report_result(
532            &mut self,
533            workflow_id: WorkflowId,
534            activity_id: ActivityId,
535            result: Payload,
536        ) -> Result<(), WorkerError> {
537            self.reports
538                .push(RecordedReport::Completed(workflow_id, activity_id, result));
539            Ok(())
540        }
541
542        async fn report_failure(
543            &mut self,
544            workflow_id: WorkflowId,
545            activity_id: ActivityId,
546            failure: ActivityError,
547        ) -> Result<(), WorkerError> {
548            self.reports
549                .push(RecordedReport::Failed(workflow_id, activity_id, failure));
550            Ok(())
551        }
552
553        async fn send_heartbeat(
554            &mut self,
555            workflow_id: WorkflowId,
556            activity_id: ActivityId,
557            progress: Option<Payload>,
558        ) -> Result<(), WorkerError> {
559            drop((workflow_id, activity_id, progress));
560            Ok(())
561        }
562    }
563
564    #[test]
565    fn empty_worker_is_rejected() {
566        let error = WorkerBuilder::new(test_config()).build().err();
567
568        assert!(error.is_some_and(|error| error.to_string().contains("at least one activity")));
569    }
570
571    #[test]
572    fn worker_collects_two_activity_registration_names() -> Result<(), WorkerError> {
573        let worker = two_activity_worker()?;
574        let expected = [String::from("double"), String::from("increment")]
575            .into_iter()
576            .collect::<BTreeSet<_>>();
577
578        assert_eq!(worker.available_handlers(), &expected);
579        assert_eq!(
580            worker.activity_types(),
581            &[String::from("double"), String::from("increment")]
582        );
583        Ok(())
584    }
585
586    #[tokio::test]
587    async fn worker_registers_names_with_session() -> Result<(), WorkerError> {
588        let worker = two_activity_worker()?;
589        let session = worker
590            .run_with_session(ChannelSession {
591                receiver: None,
592                reports: Vec::new(),
593                registered: Vec::new(),
594            })
595            .await?;
596
597        assert_eq!(
598            session.registered,
599            vec![String::from("double"), String::from("increment")]
600        );
601        Ok(())
602    }
603
604    #[tokio::test]
605    async fn shutdown_waits_for_slow_in_flight_activity() -> Result<(), WorkerError> {
606        let workflow_id = WorkflowId::new_v4();
607        let activity_id = ActivityId::from_sequence_position(7);
608        let (sender, receiver) = mpsc::channel(2);
609        sender
610            .send(Ok(WorkerSessionEvent::Task(proto_task(
611                workflow_id,
612                activity_id.clone(),
613                "slow",
614                0,
615            ))))
616            .await
617            .map_err(WorkerError::decode)?;
618        let release = Arc::new(AtomicBool::new(false));
619        let started = Arc::new(AtomicUsize::new(0));
620        let worker = Worker::builder(test_config())
621            .register_activity("slow", {
622                let release = Arc::clone(&release);
623                let started = Arc::clone(&started);
624                move |input: TestInput, context: &ActivityContext| {
625                    let release = Arc::clone(&release);
626                    let started = Arc::clone(&started);
627                    Box::pin(async move {
628                        let _ = input;
629                        started.fetch_add(1, Ordering::SeqCst);
630                        context.cancelled().await;
631                        while !release.load(Ordering::SeqCst) {
632                            tokio::time::sleep(Duration::from_millis(1)).await;
633                        }
634                        Ok(TestOutput { value: 1 })
635                    })
636                }
637            })?
638            .build()?;
639        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
640        let session = ChannelSession {
641            receiver: Some(receiver),
642            reports: Vec::new(),
643            registered: Vec::new(),
644        };
645        let handle = tokio::spawn(async move {
646            worker
647                .run_with_session_until(session, async {
648                    let _ = shutdown_receiver.await;
649                })
650                .await
651        });
652
653        wait_until_started(&started).await;
654        shutdown_sender
655            .send(())
656            .map_err(|()| WorkerError::decode(SendFailed))?;
657        tokio::time::sleep(Duration::from_millis(20)).await;
658        assert!(!handle.is_finished());
659        release.store(true, Ordering::SeqCst);
660        drop(sender);
661        let session = handle.await.map_err(WorkerError::decode)??;
662
663        assert_eq!(session.reports.len(), 1);
664        assert!(matches!(
665            &session.reports[0],
666            RecordedReport::Completed(_, reported_id, _) if reported_id == &activity_id
667        ));
668        Ok(())
669    }
670
671    fn two_activity_worker() -> Result<Worker, WorkerError> {
672        two_activity_worker_with(test_config())
673    }
674
675    fn two_activity_worker_with(config: WorkerConfig) -> Result<Worker, WorkerError> {
676        Worker::builder(config)
677            .register_activity("double", |input: TestInput, context| {
678                Box::pin(async move {
679                    let _ = context;
680                    Ok(TestOutput {
681                        value: input.value * 2,
682                    })
683                })
684            })?
685            .register_activity("increment", |input: TestInput, context| {
686                Box::pin(async move {
687                    let _ = context;
688                    Ok(TestOutput {
689                        value: input.value + 1,
690                    })
691                })
692            })?
693            .build()
694    }
695
696    fn proto_task(
697        workflow_id: WorkflowId,
698        activity_id: ActivityId,
699        activity_type: &str,
700        value: i32,
701    ) -> ProtoActivityTask {
702        ProtoActivityTask {
703            workflow_id: Some(ProtoWorkflowId::from(workflow_id)),
704            activity_id: Some(ProtoActivityId::from(activity_id)),
705            activity_type: activity_type.to_owned(),
706            input: Some(ProtoPayload::from(Payload::new(
707                ContentType::Json,
708                format!("{{\"value\":{value}}}").into_bytes(),
709            ))),
710        }
711    }
712
713    async fn wait_until_started(started: &AtomicUsize) {
714        while started.load(Ordering::SeqCst) == 0 {
715            tokio::time::sleep(Duration::from_millis(1)).await;
716        }
717    }
718
719    fn test_config() -> WorkerConfig {
720        test_config_with(ReconnectConfig::new(
721            Duration::from_millis(5),
722            Duration::from_millis(20),
723            3,
724        ))
725    }
726
727    fn test_config_with(reconnect: ReconnectConfig) -> WorkerConfig {
728        WorkerConfig::new(
729            "http://127.0.0.1:50051",
730            "payments",
731            "worker-a",
732            1,
733            reconnect,
734            None,
735        )
736    }
737
738    fn slow_reconnect_config() -> WorkerConfig {
739        test_config_with(ReconnectConfig::new(
740            Duration::from_secs(5),
741            Duration::from_secs(10),
742            5,
743        ))
744    }
745
746    #[derive(Debug, thiserror::Error)]
747    #[error("failed to send shutdown signal")]
748    struct SendFailed;
749
750    #[derive(Debug, thiserror::Error)]
751    #[error("expected the worker run to fail")]
752    struct UnexpectedSuccess;
753
754    #[derive(Debug, thiserror::Error)]
755    #[error("expected a completed activity report")]
756    struct UnexpectedReportShape;
757
758    /// Per-session record emitted by [`ScriptedSession`] for post-run assertions.
759    #[derive(Debug)]
760    enum SessionLog {
761        Registered(usize, Vec<String>),
762        Reported(usize, RecordedReport),
763    }
764
765    /// Factory-injected session whose stream contents, registration outcome,
766    /// and report behaviour are scripted per connection attempt.
767    struct ScriptedSession {
768        index: usize,
769        log: mpsc::UnboundedSender<SessionLog>,
770        events: Vec<Result<WorkerSessionEvent, WorkerError>>,
771        fail_reports: bool,
772        register_denial: Option<tonic::Status>,
773        /// Delays the receive stream's first event so paused-clock tests can
774        /// script a session that outlives the configured max backoff.
775        delay_stream: Option<Duration>,
776    }
777
778    #[async_trait]
779    impl WorkerSession for ScriptedSession {
780        async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
781            drop(config.clone());
782            Ok(())
783        }
784
785        async fn register(
786            &mut self,
787            activity_types: Vec<String>,
788            available_handlers: &BTreeSet<String>,
789        ) -> Result<(), WorkerError> {
790            validate_activity_handlers(&activity_types, available_handlers)?;
791            if let Some(denial) = self.register_denial.take() {
792                return Err(WorkerError::Registration {
793                    source: Box::new(denial),
794                });
795            }
796            self.log
797                .send(SessionLog::Registered(self.index, activity_types))
798                .map_err(WorkerError::decode)
799        }
800
801        fn receive_tasks(&mut self) -> WorkerTaskStream {
802            let events = std::mem::take(&mut self.events);
803            match self.delay_stream.take() {
804                Some(delay) => Box::pin(
805                    stream::once(async move {
806                        tokio::time::sleep(delay).await;
807                        stream::iter(events)
808                    })
809                    .flatten(),
810                ),
811                None => Box::pin(stream::iter(events)),
812            }
813        }
814
815        async fn report_result(
816            &mut self,
817            workflow_id: WorkflowId,
818            activity_id: ActivityId,
819            result: Payload,
820        ) -> Result<(), WorkerError> {
821            if self.fail_reports {
822                return Err(WorkerError::Transport {
823                    source: tonic::Status::unavailable("transport dropped before result ack"),
824                });
825            }
826            self.log
827                .send(SessionLog::Reported(
828                    self.index,
829                    RecordedReport::Completed(workflow_id, activity_id, result),
830                ))
831                .map_err(WorkerError::decode)
832        }
833
834        async fn report_failure(
835            &mut self,
836            workflow_id: WorkflowId,
837            activity_id: ActivityId,
838            failure: ActivityError,
839        ) -> Result<(), WorkerError> {
840            if self.fail_reports {
841                return Err(WorkerError::Transport {
842                    source: tonic::Status::unavailable("transport dropped before failure ack"),
843                });
844            }
845            self.log
846                .send(SessionLog::Reported(
847                    self.index,
848                    RecordedReport::Failed(workflow_id, activity_id, failure),
849                ))
850                .map_err(WorkerError::decode)
851        }
852
853        async fn send_heartbeat(
854            &mut self,
855            workflow_id: WorkflowId,
856            activity_id: ActivityId,
857            progress: Option<Payload>,
858        ) -> Result<(), WorkerError> {
859            drop((workflow_id, activity_id, progress));
860            Ok(())
861        }
862    }
863
864    #[tokio::test]
865    async fn establishment_retries_transient_failures_until_attempts_exhausted()
866    -> Result<(), WorkerError> {
867        let worker = two_activity_worker()?;
868        let attempts = Arc::new(AtomicUsize::new(0));
869        let connect = {
870            let attempts = Arc::clone(&attempts);
871            move || {
872                attempts.fetch_add(1, Ordering::SeqCst);
873                async move {
874                    Err::<ScriptedSession, _>(WorkerError::Transport {
875                        source: tonic::Status::unavailable("engine unreachable"),
876                    })
877                }
878            }
879        };
880
881        let result = worker
882            .run_with_connector_until(connect, std::future::pending::<()>())
883            .await;
884
885        assert_eq!(attempts.load(Ordering::SeqCst), 3);
886        let Err(error) = result else {
887            return Err(WorkerError::decode(UnexpectedSuccess));
888        };
889        assert!(error.is_retryable());
890        assert!(matches!(
891            error.grpc_status().map(tonic::Status::code),
892            Some(tonic::Code::Unavailable)
893        ));
894        Ok(())
895    }
896
897    #[tokio::test]
898    async fn establishment_denial_surfaces_after_one_attempt() -> Result<(), WorkerError> {
899        let worker = two_activity_worker()?;
900        let attempts = Arc::new(AtomicUsize::new(0));
901        let (log_sender, log_receiver) = mpsc::unbounded_channel();
902        let connect = {
903            let attempts = Arc::clone(&attempts);
904            move || {
905                attempts.fetch_add(1, Ordering::SeqCst);
906                let log = log_sender.clone();
907                async move {
908                    Ok(ScriptedSession {
909                        index: 1,
910                        log,
911                        events: Vec::new(),
912                        fail_reports: false,
913                        register_denial: Some(tonic::Status::permission_denied(
914                            "namespace `payments` is not granted to subject `worker-a`",
915                        )),
916                        delay_stream: None,
917                    })
918                }
919            }
920        };
921
922        let result = worker
923            .run_with_connector_until(connect, std::future::pending::<()>())
924            .await;
925
926        assert_eq!(attempts.load(Ordering::SeqCst), 1);
927        let Err(error) = result else {
928            return Err(WorkerError::decode(UnexpectedSuccess));
929        };
930        assert!(!error.is_retryable());
931        assert!(matches!(
932            error.grpc_status().map(tonic::Status::code),
933            Some(tonic::Code::PermissionDenied)
934        ));
935        assert_eq!(
936            error.grpc_status().map(tonic::Status::message),
937            Some("namespace `payments` is not granted to subject `worker-a`")
938        );
939        drop(log_receiver);
940        Ok(())
941    }
942
943    #[tokio::test]
944    async fn mid_run_drop_reconnects_re_registers_and_re_reports_unacked() -> Result<(), WorkerError>
945    {
946        let workflow_id = WorkflowId::new_v4();
947        let activity_id = ActivityId::from_sequence_position(3);
948        let worker = two_activity_worker()?;
949        let attempts = Arc::new(AtomicUsize::new(0));
950        let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
951        let connect = {
952            let attempts = Arc::clone(&attempts);
953            let log_sender = log_sender.clone();
954            let workflow_id = workflow_id.clone();
955            let activity_id = activity_id.clone();
956            move || {
957                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
958                let log = log_sender.clone();
959                let task = proto_task(workflow_id.clone(), activity_id.clone(), "double", 21);
960                async move {
961                    if attempt == 1 {
962                        Ok(ScriptedSession {
963                            index: 1,
964                            log,
965                            events: vec![Ok(WorkerSessionEvent::Task(task))],
966                            fail_reports: true,
967                            register_denial: None,
968                            delay_stream: None,
969                        })
970                    } else if attempt == 2 {
971                        Ok(ScriptedSession {
972                            index: attempt,
973                            log,
974                            events: Vec::new(),
975                            fail_reports: false,
976                            register_denial: None,
977                            delay_stream: None,
978                        })
979                    } else {
980                        // A clean close no longer ends the run, so the third
981                        // establishment is denied deterministically to end it.
982                        Ok(ScriptedSession {
983                            index: attempt,
984                            log,
985                            events: Vec::new(),
986                            fail_reports: false,
987                            register_denial: Some(tonic::Status::permission_denied(
988                                "namespace `payments` revoked for subject `worker-a`",
989                            )),
990                            delay_stream: None,
991                        })
992                    }
993                }
994            }
995        };
996
997        let result = worker
998            .run_with_connector_until(connect, std::future::pending::<()>())
999            .await;
1000
1001        drop(log_sender);
1002        let mut registrations = Vec::new();
1003        let mut reports = Vec::new();
1004        while let Some(entry) = log_receiver.recv().await {
1005            match entry {
1006                SessionLog::Registered(index, types) => registrations.push((index, types)),
1007                SessionLog::Reported(index, report) => reports.push((index, report)),
1008            }
1009        }
1010        let Err(error) = result else {
1011            return Err(WorkerError::decode(UnexpectedSuccess));
1012        };
1013        assert!(!error.is_retryable());
1014        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1015        let expected_types = vec![String::from("double"), String::from("increment")];
1016        assert_eq!(
1017            registrations,
1018            vec![(1, expected_types.clone()), (2, expected_types)]
1019        );
1020        assert_eq!(reports.len(), 1);
1021        let (session_index, report) = &reports[0];
1022        assert_eq!(*session_index, 2);
1023        let RecordedReport::Completed(reported_workflow, reported_id, payload) = report else {
1024            return Err(WorkerError::decode(UnexpectedReportShape));
1025        };
1026        assert_eq!(reported_workflow, &workflow_id);
1027        assert_eq!(reported_id, &activity_id);
1028        let output: TestOutput =
1029            serde_json::from_slice(payload.bytes()).map_err(WorkerError::decode)?;
1030        assert_eq!(output.value, 42);
1031        Ok(())
1032    }
1033
1034    #[tokio::test]
1035    async fn mid_run_drop_re_reports_unacked_results_for_all_workflows() -> Result<(), WorkerError>
1036    {
1037        let first_workflow = WorkflowId::new_v4();
1038        let second_workflow = WorkflowId::new_v4();
1039        let activity_id = ActivityId::from_sequence_position(3);
1040        let worker = two_activity_worker()?;
1041        let attempts = Arc::new(AtomicUsize::new(0));
1042        let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
1043        let connect = {
1044            let attempts = Arc::clone(&attempts);
1045            let log_sender = log_sender.clone();
1046            let first_workflow = first_workflow.clone();
1047            let second_workflow = second_workflow.clone();
1048            let activity_id = activity_id.clone();
1049            move || {
1050                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1051                let log = log_sender.clone();
1052                let first_task =
1053                    proto_task(first_workflow.clone(), activity_id.clone(), "double", 10);
1054                let second_task =
1055                    proto_task(second_workflow.clone(), activity_id.clone(), "double", 20);
1056                async move {
1057                    if attempt == 1 {
1058                        Ok(ScriptedSession {
1059                            index: 1,
1060                            log,
1061                            events: vec![
1062                                Ok(WorkerSessionEvent::Task(first_task)),
1063                                Ok(WorkerSessionEvent::Task(second_task)),
1064                            ],
1065                            fail_reports: true,
1066                            register_denial: None,
1067                            delay_stream: None,
1068                        })
1069                    } else if attempt == 2 {
1070                        Ok(ScriptedSession {
1071                            index: attempt,
1072                            log,
1073                            events: Vec::new(),
1074                            fail_reports: false,
1075                            register_denial: None,
1076                            delay_stream: None,
1077                        })
1078                    } else {
1079                        // A clean close no longer ends the run, so the third
1080                        // establishment is denied deterministically to end it.
1081                        Ok(ScriptedSession {
1082                            index: attempt,
1083                            log,
1084                            events: Vec::new(),
1085                            fail_reports: false,
1086                            register_denial: Some(tonic::Status::permission_denied(
1087                                "namespace `payments` revoked for subject `worker-a`",
1088                            )),
1089                            delay_stream: None,
1090                        })
1091                    }
1092                }
1093            }
1094        };
1095
1096        let result = worker
1097            .run_with_connector_until(connect, std::future::pending::<()>())
1098            .await;
1099
1100        drop(log_sender);
1101        let mut reports = Vec::new();
1102        while let Some(entry) = log_receiver.recv().await {
1103            if let SessionLog::Reported(index, report) = entry {
1104                reports.push((index, report));
1105            }
1106        }
1107        let Err(error) = result else {
1108            return Err(WorkerError::decode(UnexpectedSuccess));
1109        };
1110        assert!(!error.is_retryable());
1111        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1112        assert_eq!(
1113            reports.len(),
1114            2,
1115            "both workflows' colliding sequence-position results must be re-reported"
1116        );
1117        let mut reported_workflows = Vec::new();
1118        for (session_index, report) in &reports {
1119            assert_eq!(*session_index, 2, "re-reports must land on the new session");
1120            let RecordedReport::Completed(reported_workflow, reported_id, _) = report else {
1121                return Err(WorkerError::decode(UnexpectedReportShape));
1122            };
1123            assert_eq!(reported_id, &activity_id);
1124            reported_workflows.push(reported_workflow.clone());
1125        }
1126        assert!(reported_workflows.contains(&first_workflow));
1127        assert!(reported_workflows.contains(&second_workflow));
1128        Ok(())
1129    }
1130
1131    #[tokio::test]
1132    async fn shutdown_during_recovery_establishment_returns_original_drop_error()
1133    -> Result<(), WorkerError> {
1134        let worker = two_activity_worker()?;
1135        let attempts = Arc::new(AtomicUsize::new(0));
1136        let notify = Arc::new(Notify::new());
1137        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1138        let connect = {
1139            let attempts = Arc::clone(&attempts);
1140            let notify = Arc::clone(&notify);
1141            move || {
1142                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1143                let notify = Arc::clone(&notify);
1144                let log = log_sender.clone();
1145                async move {
1146                    if attempt == 1 {
1147                        Ok(ScriptedSession {
1148                            index: 1,
1149                            log,
1150                            events: vec![Err(WorkerError::Transport {
1151                                source: tonic::Status::unavailable("stream reset by peer"),
1152                            })],
1153                            fail_reports: false,
1154                            register_denial: None,
1155                            delay_stream: None,
1156                        })
1157                    } else {
1158                        // Fire shutdown while recovery is still inside the
1159                        // reconnect machinery's connect attempt, then hang
1160                        // so only the shutdown arm can win the select.
1161                        notify.notify_one();
1162                        std::future::pending::<()>().await;
1163                        Err(WorkerError::Transport {
1164                            source: tonic::Status::unavailable("unreachable"),
1165                        })
1166                    }
1167                }
1168            }
1169        };
1170        let shutdown = {
1171            let notify = Arc::clone(&notify);
1172            async move {
1173                notify.notified().await;
1174            }
1175        };
1176
1177        let run = worker.run_with_connector_until(connect, shutdown);
1178        let result = tokio::time::timeout(Duration::from_secs(5), run)
1179            .await
1180            .map_err(WorkerError::decode)?;
1181
1182        assert_eq!(attempts.load(Ordering::SeqCst), 2);
1183        let Err(error) = result else {
1184            return Err(WorkerError::decode(UnexpectedSuccess));
1185        };
1186        assert!(matches!(
1187            error.grpc_status().map(tonic::Status::code),
1188            Some(tonic::Code::Unavailable)
1189        ));
1190        assert_eq!(
1191            error.grpc_status().map(tonic::Status::message),
1192            Some("stream reset by peer"),
1193            "shutdown during recovery establishment must surface the original drop error"
1194        );
1195        drop(log_receiver);
1196        Ok(())
1197    }
1198
1199    /// The paused clock keeps every session's lifetime at exactly zero, so
1200    /// no time-based budget reset can fire: flapping sessions that never
1201    /// serve a task must exhaust at exactly `max_attempts` drops.
1202    #[tokio::test(start_paused = true)]
1203    async fn mid_run_drop_budget_exhaustion_surfaces_last_drop_error() -> Result<(), WorkerError> {
1204        let worker = two_activity_worker()?;
1205        let attempts = Arc::new(AtomicUsize::new(0));
1206        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1207        let connect = {
1208            let attempts = Arc::clone(&attempts);
1209            move || {
1210                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1211                let log = log_sender.clone();
1212                async move {
1213                    Ok(ScriptedSession {
1214                        index: attempt,
1215                        log,
1216                        events: vec![Err(WorkerError::Transport {
1217                            source: tonic::Status::unavailable("stream reset by peer"),
1218                        })],
1219                        fail_reports: false,
1220                        register_denial: None,
1221                        delay_stream: None,
1222                    })
1223                }
1224            }
1225        };
1226
1227        let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1228        let result = tokio::time::timeout(Duration::from_secs(5), run)
1229            .await
1230            .map_err(WorkerError::decode)?;
1231
1232        // test_config allows 3 reconnect attempts; the third mid-run drop
1233        // exhausts the cumulative drop budget without another reconnect.
1234        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1235        let Err(error) = result else {
1236            return Err(WorkerError::decode(UnexpectedSuccess));
1237        };
1238        assert!(error.is_retryable());
1239        assert!(matches!(
1240            error.grpc_status().map(tonic::Status::code),
1241            Some(tonic::Code::Unavailable)
1242        ));
1243        assert_eq!(
1244            error.grpc_status().map(tonic::Status::message),
1245            Some("stream reset by peer")
1246        );
1247        drop(log_receiver);
1248        Ok(())
1249    }
1250
1251    #[tokio::test]
1252    async fn mid_run_denial_surfaces_without_reconnect() -> Result<(), WorkerError> {
1253        let worker = two_activity_worker()?;
1254        let attempts = Arc::new(AtomicUsize::new(0));
1255        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1256        let connect = {
1257            let attempts = Arc::clone(&attempts);
1258            move || {
1259                attempts.fetch_add(1, Ordering::SeqCst);
1260                let log = log_sender.clone();
1261                async move {
1262                    Ok(ScriptedSession {
1263                        index: 1,
1264                        log,
1265                        events: vec![Err(WorkerError::Transport {
1266                            source: tonic::Status::permission_denied(
1267                                "namespace `payments` revoked for subject `worker-a`",
1268                            ),
1269                        })],
1270                        fail_reports: false,
1271                        register_denial: None,
1272                        delay_stream: None,
1273                    })
1274                }
1275            }
1276        };
1277
1278        let result = worker
1279            .run_with_connector_until(connect, std::future::pending::<()>())
1280            .await;
1281
1282        assert_eq!(attempts.load(Ordering::SeqCst), 1);
1283        let Err(error) = result else {
1284            return Err(WorkerError::decode(UnexpectedSuccess));
1285        };
1286        assert!(!error.is_retryable());
1287        assert!(matches!(
1288            error.grpc_status().map(tonic::Status::code),
1289            Some(tonic::Code::PermissionDenied)
1290        ));
1291        assert_eq!(
1292            error.grpc_status().map(tonic::Status::message),
1293            Some("namespace `payments` revoked for subject `worker-a`")
1294        );
1295        drop(log_receiver);
1296        Ok(())
1297    }
1298
1299    #[tokio::test]
1300    async fn shutdown_during_establishment_backoff_returns_promptly() -> Result<(), WorkerError> {
1301        let worker = two_activity_worker_with(slow_reconnect_config())?;
1302        let attempts = Arc::new(AtomicUsize::new(0));
1303        let notify = Arc::new(Notify::new());
1304        let connect = {
1305            let attempts = Arc::clone(&attempts);
1306            let notify = Arc::clone(&notify);
1307            move || {
1308                attempts.fetch_add(1, Ordering::SeqCst);
1309                notify.notify_one();
1310                async move {
1311                    Err::<ScriptedSession, _>(WorkerError::Transport {
1312                        source: tonic::Status::unavailable("engine unreachable"),
1313                    })
1314                }
1315            }
1316        };
1317        let shutdown = {
1318            let notify = Arc::clone(&notify);
1319            async move {
1320                notify.notified().await;
1321            }
1322        };
1323
1324        let run = worker.run_with_connector_until(connect, shutdown);
1325        tokio::time::timeout(Duration::from_millis(500), run)
1326            .await
1327            .map_err(WorkerError::decode)??;
1328
1329        assert_eq!(attempts.load(Ordering::SeqCst), 1);
1330        Ok(())
1331    }
1332
1333    #[tokio::test]
1334    async fn shutdown_during_mid_run_drop_backoff_returns_promptly() -> Result<(), WorkerError> {
1335        let worker = two_activity_worker_with(slow_reconnect_config())?;
1336        let attempts = Arc::new(AtomicUsize::new(0));
1337        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1338        let connect = {
1339            let attempts = Arc::clone(&attempts);
1340            move || {
1341                attempts.fetch_add(1, Ordering::SeqCst);
1342                let log = log_sender.clone();
1343                async move {
1344                    Ok(ScriptedSession {
1345                        index: 1,
1346                        log,
1347                        events: vec![Err(WorkerError::Transport {
1348                            source: tonic::Status::unavailable("stream reset by peer"),
1349                        })],
1350                        fail_reports: false,
1351                        register_denial: None,
1352                        delay_stream: None,
1353                    })
1354                }
1355            }
1356        };
1357        let shutdown = async {
1358            tokio::time::sleep(Duration::from_millis(50)).await;
1359        };
1360
1361        let run = worker.run_with_connector_until(connect, shutdown);
1362        let result = tokio::time::timeout(Duration::from_millis(500), run)
1363            .await
1364            .map_err(WorkerError::decode)?;
1365
1366        assert_eq!(attempts.load(Ordering::SeqCst), 1);
1367        let Err(error) = result else {
1368            return Err(WorkerError::decode(UnexpectedSuccess));
1369        };
1370        assert!(error.is_retryable());
1371        assert!(matches!(
1372            error.grpc_status().map(tonic::Status::code),
1373            Some(tonic::Code::Unavailable)
1374        ));
1375        drop(log_receiver);
1376        Ok(())
1377    }
1378
1379    #[tokio::test]
1380    async fn served_tasks_reset_drop_budget_across_cycles() -> Result<(), WorkerError> {
1381        let workflow_id = WorkflowId::new_v4();
1382        let activity_id = ActivityId::from_sequence_position(7);
1383        // max_backoff is enormous so only the served-task rule can reset the
1384        // budget; max_attempts = 2 so any two unhealthy drops would end the run.
1385        let worker = two_activity_worker_with(test_config_with(ReconnectConfig::new(
1386            Duration::from_millis(1),
1387            Duration::from_secs(3600),
1388            2,
1389        )))?;
1390        let attempts = Arc::new(AtomicUsize::new(0));
1391        let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
1392        let connect = {
1393            let attempts = Arc::clone(&attempts);
1394            let log_sender = log_sender.clone();
1395            let workflow_id = workflow_id.clone();
1396            let activity_id = activity_id.clone();
1397            move || {
1398                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1399                let log = log_sender.clone();
1400                let task = proto_task(workflow_id.clone(), activity_id.clone(), "double", 21);
1401                async move {
1402                    if attempt <= 4 {
1403                        Ok(ScriptedSession {
1404                            index: attempt,
1405                            log,
1406                            events: vec![
1407                                Ok(WorkerSessionEvent::Task(task)),
1408                                Err(WorkerError::Transport {
1409                                    source: tonic::Status::unavailable("stream reset by peer"),
1410                                }),
1411                            ],
1412                            fail_reports: false,
1413                            register_denial: None,
1414                            delay_stream: None,
1415                        })
1416                    } else {
1417                        Ok(ScriptedSession {
1418                            index: attempt,
1419                            log,
1420                            events: Vec::new(),
1421                            fail_reports: false,
1422                            register_denial: Some(tonic::Status::permission_denied(
1423                                "namespace `payments` revoked for subject `worker-a`",
1424                            )),
1425                            delay_stream: None,
1426                        })
1427                    }
1428                }
1429            }
1430        };
1431
1432        let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1433        let result = tokio::time::timeout(Duration::from_secs(5), run)
1434            .await
1435            .map_err(WorkerError::decode)?;
1436
1437        drop(log_sender);
1438        let mut registrations = 0_usize;
1439        while let Some(entry) = log_receiver.recv().await {
1440            if let SessionLog::Registered(..) = entry {
1441                registrations += 1;
1442            }
1443        }
1444        // Four sessions each served a task before dropping; every served task
1445        // reset the cumulative budget (max_attempts = 2), so the worker kept
1446        // recovering well past the budget until the deterministic denial on
1447        // the fifth establishment ended the run fail-fast.
1448        assert_eq!(attempts.load(Ordering::SeqCst), 5);
1449        assert_eq!(registrations, 4);
1450        let Err(error) = result else {
1451            return Err(WorkerError::decode(UnexpectedSuccess));
1452        };
1453        assert!(!error.is_retryable());
1454        assert!(matches!(
1455            error.grpc_status().map(tonic::Status::code),
1456            Some(tonic::Code::PermissionDenied)
1457        ));
1458        Ok(())
1459    }
1460
1461    #[tokio::test(start_paused = true)]
1462    async fn session_outliving_max_backoff_resets_drop_budget() -> Result<(), WorkerError> {
1463        let worker = two_activity_worker_with(test_config_with(ReconnectConfig::new(
1464            Duration::from_millis(5),
1465            Duration::from_millis(20),
1466            2,
1467        )))?;
1468        let attempts = Arc::new(AtomicUsize::new(0));
1469        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1470        let connect = {
1471            let attempts = Arc::clone(&attempts);
1472            move || {
1473                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1474                let log = log_sender.clone();
1475                async move {
1476                    Ok(ScriptedSession {
1477                        index: attempt,
1478                        log,
1479                        events: vec![Err(WorkerError::Transport {
1480                            source: tonic::Status::unavailable("stream reset by peer"),
1481                        })],
1482                        fail_reports: false,
1483                        register_denial: None,
1484                        // Only the second session outlives the 20ms max
1485                        // backoff before dropping; the others drop instantly
1486                        // (the paused clock keeps their lifetimes at zero).
1487                        delay_stream: (attempt == 2).then_some(Duration::from_millis(30)),
1488                    })
1489                }
1490            }
1491        };
1492
1493        let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1494        let result = tokio::time::timeout(Duration::from_secs(5), run)
1495            .await
1496            .map_err(WorkerError::decode)?;
1497
1498        // Drop one consumed the first budget unit. The second session served
1499        // no tasks but survived past max_backoff, so its drop restarted the
1500        // count at one. The third session's instant drop was the second
1501        // post-reset unit and exhausted max_attempts = 2 — proving exactly
1502        // one unit was consumed before the reset. Without the reset the run
1503        // would have ended after two sessions.
1504        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1505        let Err(error) = result else {
1506            return Err(WorkerError::decode(UnexpectedSuccess));
1507        };
1508        assert!(error.is_retryable());
1509        assert!(matches!(
1510            error.grpc_status().map(tonic::Status::code),
1511            Some(tonic::Code::Unavailable)
1512        ));
1513        drop(log_receiver);
1514        Ok(())
1515    }
1516
1517    /// Connected lifetime is measured to the stream end, not to the end of
1518    /// the post-drop drain: a 60ms in-flight handler draining past the 20ms
1519    /// max backoff after the stream already dropped (with its report failing,
1520    /// so no task counts as served) must not reset the budget. Measured to
1521    /// the end of the drain, every cycle would reset the budget and the
1522    /// worker would flap forever instead of exhausting.
1523    #[tokio::test(start_paused = true)]
1524    async fn post_drop_drain_time_does_not_reset_drop_budget() -> Result<(), WorkerError> {
1525        let workflow_id = WorkflowId::new_v4();
1526        let activity_id = ActivityId::from_sequence_position(9);
1527        // max_concurrency = 2 so the stream error is read while the slow
1528        // handler still holds the first dispatch permit.
1529        let config = WorkerConfig::new(
1530            "http://127.0.0.1:50051",
1531            "payments",
1532            "worker-a",
1533            2,
1534            ReconnectConfig::new(Duration::from_millis(5), Duration::from_millis(20), 2),
1535            None,
1536        );
1537        let worker = Worker::builder(config)
1538            .register_activity("slow", |input: TestInput, context: &ActivityContext| {
1539                let _ = (input, context);
1540                Box::pin(async move {
1541                    // Outlives the 20ms max backoff on the paused clock while
1542                    // the post-drop drain awaits this handler.
1543                    tokio::time::sleep(Duration::from_millis(60)).await;
1544                    Ok(TestOutput { value: 1 })
1545                })
1546            })?
1547            .build()?;
1548        let attempts = Arc::new(AtomicUsize::new(0));
1549        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1550        let connect = {
1551            let attempts = Arc::clone(&attempts);
1552            let workflow_id = workflow_id.clone();
1553            let activity_id = activity_id.clone();
1554            move || {
1555                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1556                let log = log_sender.clone();
1557                let task = proto_task(workflow_id.clone(), activity_id.clone(), "slow", 1);
1558                async move {
1559                    if attempt == 1 {
1560                        // Instant drop with no task: consumes the first
1561                        // budget unit and leaves the unacked tracker empty,
1562                        // so the second cycle reaches its serve loop.
1563                        Ok(ScriptedSession {
1564                            index: 1,
1565                            log,
1566                            events: vec![Err(WorkerError::Transport {
1567                                source: tonic::Status::unavailable("stream reset by peer"),
1568                            })],
1569                            fail_reports: false,
1570                            register_denial: None,
1571                            delay_stream: None,
1572                        })
1573                    } else {
1574                        // The server dispatches the 60ms task and kills the
1575                        // stream immediately. Failed reports keep
1576                        // tasks_reported at zero, so only a (mis)measured
1577                        // connected lifetime could reset the budget.
1578                        Ok(ScriptedSession {
1579                            index: attempt,
1580                            log,
1581                            events: vec![
1582                                Ok(WorkerSessionEvent::Task(task)),
1583                                Err(WorkerError::Transport {
1584                                    source: tonic::Status::unavailable("stream reset by peer"),
1585                                }),
1586                            ],
1587                            fail_reports: true,
1588                            register_denial: None,
1589                            delay_stream: None,
1590                        })
1591                    }
1592                }
1593            }
1594        };
1595
1596        let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1597        let result = tokio::time::timeout(Duration::from_secs(5), run)
1598            .await
1599            .map_err(WorkerError::decode)?;
1600
1601        // The second session's stream dropped at a connected lifetime of ~0
1602        // on the paused clock while its 60ms handler drained past the 20ms
1603        // max backoff; it never proved healthy, so its drop exhausted
1604        // max_attempts = 2. Measured to the end of the drain instead, the
1605        // second cycle would have reset the budget and dialled a third
1606        // session.
1607        assert_eq!(attempts.load(Ordering::SeqCst), 2);
1608        let Err(error) = result else {
1609            return Err(WorkerError::decode(UnexpectedSuccess));
1610        };
1611        assert!(error.is_retryable());
1612        assert!(matches!(
1613            error.grpc_status().map(tonic::Status::code),
1614            Some(tonic::Code::Unavailable)
1615        ));
1616        drop(log_receiver);
1617        Ok(())
1618    }
1619
1620    #[tokio::test]
1621    async fn clean_close_reconnects_re_registers_and_keeps_serving() -> Result<(), WorkerError> {
1622        let workflow_id = WorkflowId::new_v4();
1623        let first_activity = ActivityId::from_sequence_position(1);
1624        let second_activity = ActivityId::from_sequence_position(2);
1625        let worker = two_activity_worker()?;
1626        let attempts = Arc::new(AtomicUsize::new(0));
1627        let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
1628        let connect = {
1629            let attempts = Arc::clone(&attempts);
1630            let log_sender = log_sender.clone();
1631            let workflow_id = workflow_id.clone();
1632            let first_activity = first_activity.clone();
1633            let second_activity = second_activity.clone();
1634            move || {
1635                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1636                let log = log_sender.clone();
1637                let first_task =
1638                    proto_task(workflow_id.clone(), first_activity.clone(), "double", 10);
1639                let second_task =
1640                    proto_task(workflow_id.clone(), second_activity.clone(), "double", 20);
1641                async move {
1642                    match attempt {
1643                        // Both sessions end with a clean server-side stream
1644                        // close after serving one task each.
1645                        1 => Ok(ScriptedSession {
1646                            index: 1,
1647                            log,
1648                            events: vec![Ok(WorkerSessionEvent::Task(first_task))],
1649                            fail_reports: false,
1650                            register_denial: None,
1651                            delay_stream: None,
1652                        }),
1653                        2 => Ok(ScriptedSession {
1654                            index: 2,
1655                            log,
1656                            events: vec![Ok(WorkerSessionEvent::Task(second_task))],
1657                            fail_reports: false,
1658                            register_denial: None,
1659                            delay_stream: None,
1660                        }),
1661                        _ => Ok(ScriptedSession {
1662                            index: attempt,
1663                            log,
1664                            events: Vec::new(),
1665                            fail_reports: false,
1666                            register_denial: Some(tonic::Status::permission_denied(
1667                                "namespace `payments` revoked for subject `worker-a`",
1668                            )),
1669                            delay_stream: None,
1670                        }),
1671                    }
1672                }
1673            }
1674        };
1675
1676        let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1677        let result = tokio::time::timeout(Duration::from_secs(5), run)
1678            .await
1679            .map_err(WorkerError::decode)?;
1680
1681        drop(log_sender);
1682        let mut registrations = Vec::new();
1683        let mut reports = Vec::new();
1684        while let Some(entry) = log_receiver.recv().await {
1685            match entry {
1686                SessionLog::Registered(index, types) => registrations.push((index, types)),
1687                SessionLog::Reported(index, report) => reports.push((index, report)),
1688            }
1689        }
1690        // Each clean close redialled through the budgeted cycle: the worker
1691        // re-registered, re-reported the unacknowledged backlog, and kept
1692        // serving until the deterministic denial ended the run.
1693        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1694        let expected_types = vec![String::from("double"), String::from("increment")];
1695        assert_eq!(
1696            registrations,
1697            vec![(1, expected_types.clone()), (2, expected_types)]
1698        );
1699        assert_eq!(reports.len(), 3);
1700        assert!(matches!(
1701            &reports[0],
1702            (1, RecordedReport::Completed(_, id, _)) if id == &first_activity
1703        ));
1704        assert!(matches!(
1705            &reports[1],
1706            (2, RecordedReport::Completed(_, id, _)) if id == &first_activity
1707        ));
1708        assert!(matches!(
1709            &reports[2],
1710            (2, RecordedReport::Completed(_, id, _)) if id == &second_activity
1711        ));
1712        let Err(error) = result else {
1713            return Err(WorkerError::decode(UnexpectedSuccess));
1714        };
1715        assert!(!error.is_retryable());
1716        assert!(matches!(
1717            error.grpc_status().map(tonic::Status::code),
1718            Some(tonic::Code::PermissionDenied)
1719        ));
1720        Ok(())
1721    }
1722
1723    #[tokio::test(start_paused = true)]
1724    async fn clean_close_loop_exhausts_drop_budget_with_classified_error() -> Result<(), WorkerError>
1725    {
1726        let worker = two_activity_worker()?;
1727        let attempts = Arc::new(AtomicUsize::new(0));
1728        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1729        let connect = {
1730            let attempts = Arc::clone(&attempts);
1731            move || {
1732                let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1733                let log = log_sender.clone();
1734                async move {
1735                    Ok(ScriptedSession {
1736                        index: attempt,
1737                        log,
1738                        events: Vec::new(),
1739                        fail_reports: false,
1740                        register_denial: None,
1741                        delay_stream: None,
1742                    })
1743                }
1744            }
1745        };
1746
1747        let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1748        let result = tokio::time::timeout(Duration::from_secs(5), run)
1749            .await
1750            .map_err(WorkerError::decode)?;
1751
1752        // test_config allows 3 attempts: with the paused clock no session
1753        // outlives max_backoff and none serves a task, so the third clean
1754        // close exhausts the budget with the classified clean-close error —
1755        // exactly the same accounting as error drops.
1756        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1757        let Err(error) = result else {
1758            return Err(WorkerError::decode(UnexpectedSuccess));
1759        };
1760        assert!(matches!(error, WorkerError::CleanCloseExhausted));
1761        assert!(error.to_string().contains("closed the stream cleanly"));
1762        drop(log_receiver);
1763        Ok(())
1764    }
1765
1766    #[tokio::test]
1767    async fn shutdown_during_clean_close_backoff_returns_ok_promptly() -> Result<(), WorkerError> {
1768        let worker = two_activity_worker_with(slow_reconnect_config())?;
1769        let attempts = Arc::new(AtomicUsize::new(0));
1770        let (log_sender, log_receiver) = mpsc::unbounded_channel();
1771        let connect = {
1772            let attempts = Arc::clone(&attempts);
1773            move || {
1774                attempts.fetch_add(1, Ordering::SeqCst);
1775                let log = log_sender.clone();
1776                async move {
1777                    Ok(ScriptedSession {
1778                        index: 1,
1779                        log,
1780                        events: Vec::new(),
1781                        fail_reports: false,
1782                        register_denial: None,
1783                        delay_stream: None,
1784                    })
1785                }
1786            }
1787        };
1788        let shutdown = async {
1789            tokio::time::sleep(Duration::from_millis(50)).await;
1790        };
1791
1792        // The clean close enters the 5s drop backoff; shutdown must win it
1793        // promptly and a clean close pending recovery is not an error.
1794        let run = worker.run_with_connector_until(connect, shutdown);
1795        tokio::time::timeout(Duration::from_millis(500), run)
1796            .await
1797            .map_err(WorkerError::decode)??;
1798
1799        assert_eq!(attempts.load(Ordering::SeqCst), 1);
1800        drop(log_receiver);
1801        Ok(())
1802    }
1803}