1use std::collections::BTreeSet;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Poll;
8
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use tracing::{error, info, warn};
12
13use crate::activity::{ActivityRegistry, HandlerFuture};
14use crate::config::WorkerConfig;
15use crate::context::ActivityContext;
16use crate::error::WorkerError;
17use crate::protocol::reconnect::{
18 ReconnectBackoff, UnackedResultTracker, re_report_unacked, reconnect_with_backoff,
19 register_connected_session,
20};
21use crate::protocol::{GrpcWorkerSession, WorkerSession};
22use crate::runtime::{
23 NoShutdown, ServeEnd, SessionHealth, serve_activity_tasks, serve_activity_tasks_until,
24};
25
26#[must_use]
28pub struct WorkerBuilder {
29 config: WorkerConfig,
30 activities: ActivityRegistry,
31}
32
33impl WorkerBuilder {
34 pub fn new(config: WorkerConfig) -> Self {
36 Self {
37 config,
38 activities: ActivityRegistry::new(),
39 }
40 }
41
42 pub fn register_activity<Input, Output, Handler>(
48 mut self,
49 activity_type: impl Into<String>,
50 handler: Handler,
51 ) -> Result<Self, WorkerError>
52 where
53 Input: Serialize + DeserializeOwned + Send + Sync + 'static,
54 Output: Serialize + Send + Sync + 'static,
55 Handler: for<'context> Fn(Input, &'context ActivityContext) -> HandlerFuture<'context, Output>
56 + Send
57 + Sync
58 + 'static,
59 {
60 self.activities = self.activities.register_activity(activity_type, handler)?;
61 Ok(self)
62 }
63
64 pub fn build(self) -> Result<Worker, WorkerError> {
70 if self.activities.is_empty() {
71 return Err(WorkerError::registration(EmptyActivitySet));
72 }
73 let available_handlers = self.activities.activity_types();
74 let activity_types = available_handlers.iter().cloned().collect();
75 Ok(Worker {
76 config: self.config,
77 activity_types,
78 available_handlers,
79 activities: Arc::new(self.activities),
80 })
81 }
82}
83
84#[must_use]
86pub struct Worker {
87 config: WorkerConfig,
88 activity_types: Vec<String>,
89 available_handlers: BTreeSet<String>,
90 activities: Arc<ActivityRegistry>,
91}
92
93impl Worker {
94 pub fn builder(config: WorkerConfig) -> WorkerBuilder {
96 WorkerBuilder::new(config)
97 }
98
99 #[must_use]
101 pub fn activity_types(&self) -> &[String] {
102 &self.activity_types
103 }
104
105 #[must_use]
107 pub fn available_handlers(&self) -> &BTreeSet<String> {
108 &self.available_handlers
109 }
110
111 pub async fn run(self) -> Result<(), WorkerError> {
128 self.run_until(std::future::pending::<()>()).await
129 }
130
131 pub async fn run_until<Shutdown>(self, shutdown: Shutdown) -> Result<(), WorkerError>
143 where
144 Shutdown: Future<Output = ()> + Send,
145 {
146 let config = self.config.clone();
147 self.run_with_connector_until(move || GrpcWorkerSession::connect(config.clone()), shutdown)
148 .await
149 }
150
151 pub async fn run_with_connector_until<S, F, Fut, Shutdown>(
185 self,
186 mut connect: F,
187 shutdown: Shutdown,
188 ) -> Result<(), WorkerError>
189 where
190 S: WorkerSession,
191 F: FnMut() -> Fut,
192 Fut: Future<Output = Result<S, WorkerError>>,
193 Shutdown: Future<Output = ()> + Send,
194 {
195 let backoff = ReconnectBackoff::from_config(&self.config)?;
196 let mut tracker = UnackedResultTracker::new();
197 tokio::pin!(shutdown);
198 let mut shutdown = SharedShutdown::new(shutdown);
199 let mut drop_failures = 0_usize;
200 let mut recovery_error: Option<WorkerError> = None;
201
202 loop {
203 let connected = tokio::select! {
204 biased;
205 () = shutdown.wait() => {
206 return recovery_error.take().map_or(Ok(()), Err);
207 }
208 result = reconnect_with_backoff(
209 &self.config,
210 self.activity_types.clone(),
211 &self.available_handlers,
212 &mut connect,
213 ) => result,
214 };
215 let mut session = connected?;
216 let session_started = tokio::time::Instant::now();
217 let mut health = SessionHealth::default();
218 let served = match re_report_unacked(&tracker, &mut session).await {
219 Ok(()) => {
220 serve_activity_tasks_until(
221 &self.config,
222 &mut session,
223 Arc::clone(&self.activities),
224 &mut tracker,
225 &mut health,
226 shutdown.wait(),
227 )
228 .await
229 }
230 Err(report_error) => Err(report_error),
231 };
232 drop(session);
233 let cause = match served {
234 Ok(ServeEnd::Shutdown) => return Ok(()),
235 Ok(ServeEnd::StreamClosed) => {
236 if shutdown.fired() {
237 return Ok(());
238 }
239 DropCause::CleanClose
240 }
241 Err(error) if !error.is_retryable() => {
242 error!(error = %error, "worker session denied by server; not reconnecting");
243 return Err(error);
244 }
245 Err(error) => {
246 if shutdown.fired() {
247 return Err(error);
248 }
249 DropCause::Failure(error)
250 }
251 };
252 let connected_for = health
259 .stream_ended_at
260 .unwrap_or_else(tokio::time::Instant::now)
261 .saturating_duration_since(session_started);
262 let proved_healthy = health.tasks_reported > 0 || connected_for > backoff.max_delay();
263 if proved_healthy && drop_failures > 0 {
264 info!(
265 drop_failures,
266 tasks_reported = health.tasks_reported,
267 "worker session proved healthy; drop budget reset"
268 );
269 drop_failures = 0;
270 }
271 drop_failures += 1;
272 if drop_failures >= backoff.attempts() {
273 let error = cause.into_exhaustion_error();
274 error!(
275 drop_failures,
276 error = %error,
277 "worker session drop budget exhausted; not reconnecting"
278 );
279 return Err(error);
280 }
281 let delay = backoff.delay_for_attempt(drop_failures);
282 warn!(
283 drop_failures,
284 delay_ms = delay.as_millis(),
285 cause = %cause,
286 "worker session dropped; reconnecting after backoff"
287 );
288 let shutdown_won = tokio::select! {
289 biased;
290 () = shutdown.wait() => true,
291 () = tokio::time::sleep(delay) => false,
292 };
293 if shutdown_won {
294 return cause.into_shutdown_result();
295 }
296 recovery_error = cause.into_recovery_error();
297 }
298 }
299
300 pub async fn run_with_session<S>(self, session: S) -> Result<S, WorkerError>
306 where
307 S: WorkerSession,
308 {
309 self.run_with_session_until(session, std::future::pending::<()>())
310 .await
311 }
312
313 pub async fn run_with_session_until<S, Shutdown>(
319 self,
320 session: S,
321 shutdown: Shutdown,
322 ) -> Result<S, WorkerError>
323 where
324 S: WorkerSession,
325 Shutdown: Future<Output = ()> + Send,
326 {
327 let mut session = register_connected_session(
328 session,
329 &self.config,
330 self.activity_types.clone(),
331 &self.available_handlers,
332 )
333 .await?;
334 let mut tracker = UnackedResultTracker::new();
335 let mut health = SessionHealth::default();
336 serve_activity_tasks_until(
337 &self.config,
338 &mut session,
339 self.activities,
340 &mut tracker,
341 &mut health,
342 shutdown,
343 )
344 .await?;
345 Ok(session)
346 }
347}
348
349enum DropCause {
351 Failure(WorkerError),
353 CleanClose,
355}
356
357impl DropCause {
358 fn into_exhaustion_error(self) -> WorkerError {
360 match self {
361 Self::Failure(error) => error,
362 Self::CleanClose => WorkerError::CleanCloseExhausted,
363 }
364 }
365
366 fn into_shutdown_result(self) -> Result<(), WorkerError> {
369 match self {
370 Self::Failure(error) => Err(error),
371 Self::CleanClose => Ok(()),
372 }
373 }
374
375 fn into_recovery_error(self) -> Option<WorkerError> {
377 match self {
378 Self::Failure(error) => Some(error),
379 Self::CleanClose => None,
380 }
381 }
382}
383
384impl std::fmt::Display for DropCause {
385 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 Self::Failure(error) => write!(formatter, "{error}"),
388 Self::CleanClose => write!(formatter, "server closed the worker stream cleanly"),
389 }
390 }
391}
392
393struct SharedShutdown<'a, S> {
402 inner: Pin<&'a mut S>,
403 fired: bool,
404}
405
406impl<'a, S> SharedShutdown<'a, S>
407where
408 S: Future<Output = ()> + Send,
409{
410 const fn new(inner: Pin<&'a mut S>) -> Self {
411 Self {
412 inner,
413 fired: false,
414 }
415 }
416
417 const fn fired(&self) -> bool {
419 self.fired
420 }
421
422 fn wait(&mut self) -> impl Future<Output = ()> + Send {
424 std::future::poll_fn(|context| {
425 if self.fired {
426 return Poll::Ready(());
427 }
428 match self.inner.as_mut().poll(context) {
429 Poll::Ready(()) => {
430 self.fired = true;
431 Poll::Ready(())
432 }
433 Poll::Pending => Poll::Pending,
434 }
435 })
436 }
437}
438
439pub async fn run_worker_with_session<S>(worker: Worker, session: S) -> Result<S, WorkerError>
445where
446 S: WorkerSession,
447{
448 worker.run_with_session(session).await
449}
450
451#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
453#[error("worker must register at least one activity handler")]
454pub struct EmptyActivitySet;
455
456fn _assert_live_session_type() {
457 let _ = std::mem::size_of::<GrpcWorkerSession>();
458 let _ = std::mem::size_of::<NoShutdown>();
459 let _ = serve_activity_tasks::<GrpcWorkerSession, ActivityRegistry>;
460}
461
462#[cfg(test)]
463mod tests {
464 use std::collections::BTreeSet;
465 use std::sync::Arc;
466 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
467 use std::time::Duration;
468
469 use aion_core::{ActivityError, ActivityId, ContentType, Payload, WorkflowId};
470 use aion_proto::{ProtoActivityId, ProtoActivityTask, ProtoPayload, ProtoWorkflowId};
471 use async_trait::async_trait;
472 use futures::StreamExt as _;
473 use futures::stream;
474 use serde::{Deserialize, Serialize};
475 use tokio::sync::{Notify, mpsc};
476
477 use super::{Worker, WorkerBuilder};
478 use crate::config::{ReconnectConfig, WorkerConfig};
479 use crate::context::ActivityContext;
480 use crate::error::WorkerError;
481 use crate::protocol::{
482 WorkerSession, WorkerSessionEvent, WorkerTaskStream, validate_activity_handlers,
483 };
484
485 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
486 struct TestInput {
487 value: i32,
488 }
489
490 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
491 struct TestOutput {
492 value: i32,
493 }
494
495 struct ChannelSession {
496 receiver: Option<mpsc::Receiver<Result<WorkerSessionEvent, WorkerError>>>,
497 reports: Vec<RecordedReport>,
498 registered: Vec<String>,
499 }
500
501 #[derive(Clone, Debug, PartialEq, Eq)]
502 enum RecordedReport {
503 Completed(WorkflowId, ActivityId, Payload),
504 Failed(WorkflowId, ActivityId, ActivityError),
505 }
506
507 #[async_trait]
508 impl WorkerSession for ChannelSession {
509 async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
510 drop(config.clone());
511 Ok(())
512 }
513
514 async fn register(
515 &mut self,
516 activity_types: Vec<String>,
517 available_handlers: &BTreeSet<String>,
518 ) -> Result<(), WorkerError> {
519 validate_activity_handlers(&activity_types, available_handlers)?;
520 self.registered = activity_types;
521 Ok(())
522 }
523
524 fn receive_tasks(&mut self) -> WorkerTaskStream {
525 match self.receiver.take() {
526 Some(receiver) => Box::pin(tokio_stream::wrappers::ReceiverStream::new(receiver)),
527 None => Box::pin(stream::empty()),
528 }
529 }
530
531 async fn report_result(
532 &mut self,
533 workflow_id: WorkflowId,
534 activity_id: ActivityId,
535 result: Payload,
536 ) -> Result<(), WorkerError> {
537 self.reports
538 .push(RecordedReport::Completed(workflow_id, activity_id, result));
539 Ok(())
540 }
541
542 async fn report_failure(
543 &mut self,
544 workflow_id: WorkflowId,
545 activity_id: ActivityId,
546 failure: ActivityError,
547 ) -> Result<(), WorkerError> {
548 self.reports
549 .push(RecordedReport::Failed(workflow_id, activity_id, failure));
550 Ok(())
551 }
552
553 async fn send_heartbeat(
554 &mut self,
555 workflow_id: WorkflowId,
556 activity_id: ActivityId,
557 progress: Option<Payload>,
558 ) -> Result<(), WorkerError> {
559 drop((workflow_id, activity_id, progress));
560 Ok(())
561 }
562 }
563
564 #[test]
565 fn empty_worker_is_rejected() {
566 let error = WorkerBuilder::new(test_config()).build().err();
567
568 assert!(error.is_some_and(|error| error.to_string().contains("at least one activity")));
569 }
570
571 #[test]
572 fn worker_collects_two_activity_registration_names() -> Result<(), WorkerError> {
573 let worker = two_activity_worker()?;
574 let expected = [String::from("double"), String::from("increment")]
575 .into_iter()
576 .collect::<BTreeSet<_>>();
577
578 assert_eq!(worker.available_handlers(), &expected);
579 assert_eq!(
580 worker.activity_types(),
581 &[String::from("double"), String::from("increment")]
582 );
583 Ok(())
584 }
585
586 #[tokio::test]
587 async fn worker_registers_names_with_session() -> Result<(), WorkerError> {
588 let worker = two_activity_worker()?;
589 let session = worker
590 .run_with_session(ChannelSession {
591 receiver: None,
592 reports: Vec::new(),
593 registered: Vec::new(),
594 })
595 .await?;
596
597 assert_eq!(
598 session.registered,
599 vec![String::from("double"), String::from("increment")]
600 );
601 Ok(())
602 }
603
604 #[tokio::test]
605 async fn shutdown_waits_for_slow_in_flight_activity() -> Result<(), WorkerError> {
606 let workflow_id = WorkflowId::new_v4();
607 let activity_id = ActivityId::from_sequence_position(7);
608 let (sender, receiver) = mpsc::channel(2);
609 sender
610 .send(Ok(WorkerSessionEvent::Task(proto_task(
611 workflow_id,
612 activity_id.clone(),
613 "slow",
614 0,
615 ))))
616 .await
617 .map_err(WorkerError::decode)?;
618 let release = Arc::new(AtomicBool::new(false));
619 let started = Arc::new(AtomicUsize::new(0));
620 let worker = Worker::builder(test_config())
621 .register_activity("slow", {
622 let release = Arc::clone(&release);
623 let started = Arc::clone(&started);
624 move |input: TestInput, context: &ActivityContext| {
625 let release = Arc::clone(&release);
626 let started = Arc::clone(&started);
627 Box::pin(async move {
628 let _ = input;
629 started.fetch_add(1, Ordering::SeqCst);
630 context.cancelled().await;
631 while !release.load(Ordering::SeqCst) {
632 tokio::time::sleep(Duration::from_millis(1)).await;
633 }
634 Ok(TestOutput { value: 1 })
635 })
636 }
637 })?
638 .build()?;
639 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
640 let session = ChannelSession {
641 receiver: Some(receiver),
642 reports: Vec::new(),
643 registered: Vec::new(),
644 };
645 let handle = tokio::spawn(async move {
646 worker
647 .run_with_session_until(session, async {
648 let _ = shutdown_receiver.await;
649 })
650 .await
651 });
652
653 wait_until_started(&started).await;
654 shutdown_sender
655 .send(())
656 .map_err(|()| WorkerError::decode(SendFailed))?;
657 tokio::time::sleep(Duration::from_millis(20)).await;
658 assert!(!handle.is_finished());
659 release.store(true, Ordering::SeqCst);
660 drop(sender);
661 let session = handle.await.map_err(WorkerError::decode)??;
662
663 assert_eq!(session.reports.len(), 1);
664 assert!(matches!(
665 &session.reports[0],
666 RecordedReport::Completed(_, reported_id, _) if reported_id == &activity_id
667 ));
668 Ok(())
669 }
670
671 fn two_activity_worker() -> Result<Worker, WorkerError> {
672 two_activity_worker_with(test_config())
673 }
674
675 fn two_activity_worker_with(config: WorkerConfig) -> Result<Worker, WorkerError> {
676 Worker::builder(config)
677 .register_activity("double", |input: TestInput, context| {
678 Box::pin(async move {
679 let _ = context;
680 Ok(TestOutput {
681 value: input.value * 2,
682 })
683 })
684 })?
685 .register_activity("increment", |input: TestInput, context| {
686 Box::pin(async move {
687 let _ = context;
688 Ok(TestOutput {
689 value: input.value + 1,
690 })
691 })
692 })?
693 .build()
694 }
695
696 fn proto_task(
697 workflow_id: WorkflowId,
698 activity_id: ActivityId,
699 activity_type: &str,
700 value: i32,
701 ) -> ProtoActivityTask {
702 ProtoActivityTask {
703 workflow_id: Some(ProtoWorkflowId::from(workflow_id)),
704 activity_id: Some(ProtoActivityId::from(activity_id)),
705 activity_type: activity_type.to_owned(),
706 input: Some(ProtoPayload::from(Payload::new(
707 ContentType::Json,
708 format!("{{\"value\":{value}}}").into_bytes(),
709 ))),
710 }
711 }
712
713 async fn wait_until_started(started: &AtomicUsize) {
714 while started.load(Ordering::SeqCst) == 0 {
715 tokio::time::sleep(Duration::from_millis(1)).await;
716 }
717 }
718
719 fn test_config() -> WorkerConfig {
720 test_config_with(ReconnectConfig::new(
721 Duration::from_millis(5),
722 Duration::from_millis(20),
723 3,
724 ))
725 }
726
727 fn test_config_with(reconnect: ReconnectConfig) -> WorkerConfig {
728 WorkerConfig::new(
729 "http://127.0.0.1:50051",
730 "payments",
731 "worker-a",
732 1,
733 reconnect,
734 None,
735 )
736 }
737
738 fn slow_reconnect_config() -> WorkerConfig {
739 test_config_with(ReconnectConfig::new(
740 Duration::from_secs(5),
741 Duration::from_secs(10),
742 5,
743 ))
744 }
745
746 #[derive(Debug, thiserror::Error)]
747 #[error("failed to send shutdown signal")]
748 struct SendFailed;
749
750 #[derive(Debug, thiserror::Error)]
751 #[error("expected the worker run to fail")]
752 struct UnexpectedSuccess;
753
754 #[derive(Debug, thiserror::Error)]
755 #[error("expected a completed activity report")]
756 struct UnexpectedReportShape;
757
758 #[derive(Debug)]
760 enum SessionLog {
761 Registered(usize, Vec<String>),
762 Reported(usize, RecordedReport),
763 }
764
765 struct ScriptedSession {
768 index: usize,
769 log: mpsc::UnboundedSender<SessionLog>,
770 events: Vec<Result<WorkerSessionEvent, WorkerError>>,
771 fail_reports: bool,
772 register_denial: Option<tonic::Status>,
773 delay_stream: Option<Duration>,
776 }
777
778 #[async_trait]
779 impl WorkerSession for ScriptedSession {
780 async fn handshake(&mut self, config: &WorkerConfig) -> Result<(), WorkerError> {
781 drop(config.clone());
782 Ok(())
783 }
784
785 async fn register(
786 &mut self,
787 activity_types: Vec<String>,
788 available_handlers: &BTreeSet<String>,
789 ) -> Result<(), WorkerError> {
790 validate_activity_handlers(&activity_types, available_handlers)?;
791 if let Some(denial) = self.register_denial.take() {
792 return Err(WorkerError::Registration {
793 source: Box::new(denial),
794 });
795 }
796 self.log
797 .send(SessionLog::Registered(self.index, activity_types))
798 .map_err(WorkerError::decode)
799 }
800
801 fn receive_tasks(&mut self) -> WorkerTaskStream {
802 let events = std::mem::take(&mut self.events);
803 match self.delay_stream.take() {
804 Some(delay) => Box::pin(
805 stream::once(async move {
806 tokio::time::sleep(delay).await;
807 stream::iter(events)
808 })
809 .flatten(),
810 ),
811 None => Box::pin(stream::iter(events)),
812 }
813 }
814
815 async fn report_result(
816 &mut self,
817 workflow_id: WorkflowId,
818 activity_id: ActivityId,
819 result: Payload,
820 ) -> Result<(), WorkerError> {
821 if self.fail_reports {
822 return Err(WorkerError::Transport {
823 source: tonic::Status::unavailable("transport dropped before result ack"),
824 });
825 }
826 self.log
827 .send(SessionLog::Reported(
828 self.index,
829 RecordedReport::Completed(workflow_id, activity_id, result),
830 ))
831 .map_err(WorkerError::decode)
832 }
833
834 async fn report_failure(
835 &mut self,
836 workflow_id: WorkflowId,
837 activity_id: ActivityId,
838 failure: ActivityError,
839 ) -> Result<(), WorkerError> {
840 if self.fail_reports {
841 return Err(WorkerError::Transport {
842 source: tonic::Status::unavailable("transport dropped before failure ack"),
843 });
844 }
845 self.log
846 .send(SessionLog::Reported(
847 self.index,
848 RecordedReport::Failed(workflow_id, activity_id, failure),
849 ))
850 .map_err(WorkerError::decode)
851 }
852
853 async fn send_heartbeat(
854 &mut self,
855 workflow_id: WorkflowId,
856 activity_id: ActivityId,
857 progress: Option<Payload>,
858 ) -> Result<(), WorkerError> {
859 drop((workflow_id, activity_id, progress));
860 Ok(())
861 }
862 }
863
864 #[tokio::test]
865 async fn establishment_retries_transient_failures_until_attempts_exhausted()
866 -> Result<(), WorkerError> {
867 let worker = two_activity_worker()?;
868 let attempts = Arc::new(AtomicUsize::new(0));
869 let connect = {
870 let attempts = Arc::clone(&attempts);
871 move || {
872 attempts.fetch_add(1, Ordering::SeqCst);
873 async move {
874 Err::<ScriptedSession, _>(WorkerError::Transport {
875 source: tonic::Status::unavailable("engine unreachable"),
876 })
877 }
878 }
879 };
880
881 let result = worker
882 .run_with_connector_until(connect, std::future::pending::<()>())
883 .await;
884
885 assert_eq!(attempts.load(Ordering::SeqCst), 3);
886 let Err(error) = result else {
887 return Err(WorkerError::decode(UnexpectedSuccess));
888 };
889 assert!(error.is_retryable());
890 assert!(matches!(
891 error.grpc_status().map(tonic::Status::code),
892 Some(tonic::Code::Unavailable)
893 ));
894 Ok(())
895 }
896
897 #[tokio::test]
898 async fn establishment_denial_surfaces_after_one_attempt() -> Result<(), WorkerError> {
899 let worker = two_activity_worker()?;
900 let attempts = Arc::new(AtomicUsize::new(0));
901 let (log_sender, log_receiver) = mpsc::unbounded_channel();
902 let connect = {
903 let attempts = Arc::clone(&attempts);
904 move || {
905 attempts.fetch_add(1, Ordering::SeqCst);
906 let log = log_sender.clone();
907 async move {
908 Ok(ScriptedSession {
909 index: 1,
910 log,
911 events: Vec::new(),
912 fail_reports: false,
913 register_denial: Some(tonic::Status::permission_denied(
914 "namespace `payments` is not granted to subject `worker-a`",
915 )),
916 delay_stream: None,
917 })
918 }
919 }
920 };
921
922 let result = worker
923 .run_with_connector_until(connect, std::future::pending::<()>())
924 .await;
925
926 assert_eq!(attempts.load(Ordering::SeqCst), 1);
927 let Err(error) = result else {
928 return Err(WorkerError::decode(UnexpectedSuccess));
929 };
930 assert!(!error.is_retryable());
931 assert!(matches!(
932 error.grpc_status().map(tonic::Status::code),
933 Some(tonic::Code::PermissionDenied)
934 ));
935 assert_eq!(
936 error.grpc_status().map(tonic::Status::message),
937 Some("namespace `payments` is not granted to subject `worker-a`")
938 );
939 drop(log_receiver);
940 Ok(())
941 }
942
943 #[tokio::test]
944 async fn mid_run_drop_reconnects_re_registers_and_re_reports_unacked() -> Result<(), WorkerError>
945 {
946 let workflow_id = WorkflowId::new_v4();
947 let activity_id = ActivityId::from_sequence_position(3);
948 let worker = two_activity_worker()?;
949 let attempts = Arc::new(AtomicUsize::new(0));
950 let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
951 let connect = {
952 let attempts = Arc::clone(&attempts);
953 let log_sender = log_sender.clone();
954 let workflow_id = workflow_id.clone();
955 let activity_id = activity_id.clone();
956 move || {
957 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
958 let log = log_sender.clone();
959 let task = proto_task(workflow_id.clone(), activity_id.clone(), "double", 21);
960 async move {
961 if attempt == 1 {
962 Ok(ScriptedSession {
963 index: 1,
964 log,
965 events: vec![Ok(WorkerSessionEvent::Task(task))],
966 fail_reports: true,
967 register_denial: None,
968 delay_stream: None,
969 })
970 } else if attempt == 2 {
971 Ok(ScriptedSession {
972 index: attempt,
973 log,
974 events: Vec::new(),
975 fail_reports: false,
976 register_denial: None,
977 delay_stream: None,
978 })
979 } else {
980 Ok(ScriptedSession {
983 index: attempt,
984 log,
985 events: Vec::new(),
986 fail_reports: false,
987 register_denial: Some(tonic::Status::permission_denied(
988 "namespace `payments` revoked for subject `worker-a`",
989 )),
990 delay_stream: None,
991 })
992 }
993 }
994 }
995 };
996
997 let result = worker
998 .run_with_connector_until(connect, std::future::pending::<()>())
999 .await;
1000
1001 drop(log_sender);
1002 let mut registrations = Vec::new();
1003 let mut reports = Vec::new();
1004 while let Some(entry) = log_receiver.recv().await {
1005 match entry {
1006 SessionLog::Registered(index, types) => registrations.push((index, types)),
1007 SessionLog::Reported(index, report) => reports.push((index, report)),
1008 }
1009 }
1010 let Err(error) = result else {
1011 return Err(WorkerError::decode(UnexpectedSuccess));
1012 };
1013 assert!(!error.is_retryable());
1014 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1015 let expected_types = vec![String::from("double"), String::from("increment")];
1016 assert_eq!(
1017 registrations,
1018 vec![(1, expected_types.clone()), (2, expected_types)]
1019 );
1020 assert_eq!(reports.len(), 1);
1021 let (session_index, report) = &reports[0];
1022 assert_eq!(*session_index, 2);
1023 let RecordedReport::Completed(reported_workflow, reported_id, payload) = report else {
1024 return Err(WorkerError::decode(UnexpectedReportShape));
1025 };
1026 assert_eq!(reported_workflow, &workflow_id);
1027 assert_eq!(reported_id, &activity_id);
1028 let output: TestOutput =
1029 serde_json::from_slice(payload.bytes()).map_err(WorkerError::decode)?;
1030 assert_eq!(output.value, 42);
1031 Ok(())
1032 }
1033
1034 #[tokio::test]
1035 async fn mid_run_drop_re_reports_unacked_results_for_all_workflows() -> Result<(), WorkerError>
1036 {
1037 let first_workflow = WorkflowId::new_v4();
1038 let second_workflow = WorkflowId::new_v4();
1039 let activity_id = ActivityId::from_sequence_position(3);
1040 let worker = two_activity_worker()?;
1041 let attempts = Arc::new(AtomicUsize::new(0));
1042 let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
1043 let connect = {
1044 let attempts = Arc::clone(&attempts);
1045 let log_sender = log_sender.clone();
1046 let first_workflow = first_workflow.clone();
1047 let second_workflow = second_workflow.clone();
1048 let activity_id = activity_id.clone();
1049 move || {
1050 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1051 let log = log_sender.clone();
1052 let first_task =
1053 proto_task(first_workflow.clone(), activity_id.clone(), "double", 10);
1054 let second_task =
1055 proto_task(second_workflow.clone(), activity_id.clone(), "double", 20);
1056 async move {
1057 if attempt == 1 {
1058 Ok(ScriptedSession {
1059 index: 1,
1060 log,
1061 events: vec![
1062 Ok(WorkerSessionEvent::Task(first_task)),
1063 Ok(WorkerSessionEvent::Task(second_task)),
1064 ],
1065 fail_reports: true,
1066 register_denial: None,
1067 delay_stream: None,
1068 })
1069 } else if attempt == 2 {
1070 Ok(ScriptedSession {
1071 index: attempt,
1072 log,
1073 events: Vec::new(),
1074 fail_reports: false,
1075 register_denial: None,
1076 delay_stream: None,
1077 })
1078 } else {
1079 Ok(ScriptedSession {
1082 index: attempt,
1083 log,
1084 events: Vec::new(),
1085 fail_reports: false,
1086 register_denial: Some(tonic::Status::permission_denied(
1087 "namespace `payments` revoked for subject `worker-a`",
1088 )),
1089 delay_stream: None,
1090 })
1091 }
1092 }
1093 }
1094 };
1095
1096 let result = worker
1097 .run_with_connector_until(connect, std::future::pending::<()>())
1098 .await;
1099
1100 drop(log_sender);
1101 let mut reports = Vec::new();
1102 while let Some(entry) = log_receiver.recv().await {
1103 if let SessionLog::Reported(index, report) = entry {
1104 reports.push((index, report));
1105 }
1106 }
1107 let Err(error) = result else {
1108 return Err(WorkerError::decode(UnexpectedSuccess));
1109 };
1110 assert!(!error.is_retryable());
1111 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1112 assert_eq!(
1113 reports.len(),
1114 2,
1115 "both workflows' colliding sequence-position results must be re-reported"
1116 );
1117 let mut reported_workflows = Vec::new();
1118 for (session_index, report) in &reports {
1119 assert_eq!(*session_index, 2, "re-reports must land on the new session");
1120 let RecordedReport::Completed(reported_workflow, reported_id, _) = report else {
1121 return Err(WorkerError::decode(UnexpectedReportShape));
1122 };
1123 assert_eq!(reported_id, &activity_id);
1124 reported_workflows.push(reported_workflow.clone());
1125 }
1126 assert!(reported_workflows.contains(&first_workflow));
1127 assert!(reported_workflows.contains(&second_workflow));
1128 Ok(())
1129 }
1130
1131 #[tokio::test]
1132 async fn shutdown_during_recovery_establishment_returns_original_drop_error()
1133 -> Result<(), WorkerError> {
1134 let worker = two_activity_worker()?;
1135 let attempts = Arc::new(AtomicUsize::new(0));
1136 let notify = Arc::new(Notify::new());
1137 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1138 let connect = {
1139 let attempts = Arc::clone(&attempts);
1140 let notify = Arc::clone(¬ify);
1141 move || {
1142 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1143 let notify = Arc::clone(¬ify);
1144 let log = log_sender.clone();
1145 async move {
1146 if attempt == 1 {
1147 Ok(ScriptedSession {
1148 index: 1,
1149 log,
1150 events: vec![Err(WorkerError::Transport {
1151 source: tonic::Status::unavailable("stream reset by peer"),
1152 })],
1153 fail_reports: false,
1154 register_denial: None,
1155 delay_stream: None,
1156 })
1157 } else {
1158 notify.notify_one();
1162 std::future::pending::<()>().await;
1163 Err(WorkerError::Transport {
1164 source: tonic::Status::unavailable("unreachable"),
1165 })
1166 }
1167 }
1168 }
1169 };
1170 let shutdown = {
1171 let notify = Arc::clone(¬ify);
1172 async move {
1173 notify.notified().await;
1174 }
1175 };
1176
1177 let run = worker.run_with_connector_until(connect, shutdown);
1178 let result = tokio::time::timeout(Duration::from_secs(5), run)
1179 .await
1180 .map_err(WorkerError::decode)?;
1181
1182 assert_eq!(attempts.load(Ordering::SeqCst), 2);
1183 let Err(error) = result else {
1184 return Err(WorkerError::decode(UnexpectedSuccess));
1185 };
1186 assert!(matches!(
1187 error.grpc_status().map(tonic::Status::code),
1188 Some(tonic::Code::Unavailable)
1189 ));
1190 assert_eq!(
1191 error.grpc_status().map(tonic::Status::message),
1192 Some("stream reset by peer"),
1193 "shutdown during recovery establishment must surface the original drop error"
1194 );
1195 drop(log_receiver);
1196 Ok(())
1197 }
1198
1199 #[tokio::test(start_paused = true)]
1203 async fn mid_run_drop_budget_exhaustion_surfaces_last_drop_error() -> Result<(), WorkerError> {
1204 let worker = two_activity_worker()?;
1205 let attempts = Arc::new(AtomicUsize::new(0));
1206 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1207 let connect = {
1208 let attempts = Arc::clone(&attempts);
1209 move || {
1210 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1211 let log = log_sender.clone();
1212 async move {
1213 Ok(ScriptedSession {
1214 index: attempt,
1215 log,
1216 events: vec![Err(WorkerError::Transport {
1217 source: tonic::Status::unavailable("stream reset by peer"),
1218 })],
1219 fail_reports: false,
1220 register_denial: None,
1221 delay_stream: None,
1222 })
1223 }
1224 }
1225 };
1226
1227 let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1228 let result = tokio::time::timeout(Duration::from_secs(5), run)
1229 .await
1230 .map_err(WorkerError::decode)?;
1231
1232 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1235 let Err(error) = result else {
1236 return Err(WorkerError::decode(UnexpectedSuccess));
1237 };
1238 assert!(error.is_retryable());
1239 assert!(matches!(
1240 error.grpc_status().map(tonic::Status::code),
1241 Some(tonic::Code::Unavailable)
1242 ));
1243 assert_eq!(
1244 error.grpc_status().map(tonic::Status::message),
1245 Some("stream reset by peer")
1246 );
1247 drop(log_receiver);
1248 Ok(())
1249 }
1250
1251 #[tokio::test]
1252 async fn mid_run_denial_surfaces_without_reconnect() -> Result<(), WorkerError> {
1253 let worker = two_activity_worker()?;
1254 let attempts = Arc::new(AtomicUsize::new(0));
1255 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1256 let connect = {
1257 let attempts = Arc::clone(&attempts);
1258 move || {
1259 attempts.fetch_add(1, Ordering::SeqCst);
1260 let log = log_sender.clone();
1261 async move {
1262 Ok(ScriptedSession {
1263 index: 1,
1264 log,
1265 events: vec![Err(WorkerError::Transport {
1266 source: tonic::Status::permission_denied(
1267 "namespace `payments` revoked for subject `worker-a`",
1268 ),
1269 })],
1270 fail_reports: false,
1271 register_denial: None,
1272 delay_stream: None,
1273 })
1274 }
1275 }
1276 };
1277
1278 let result = worker
1279 .run_with_connector_until(connect, std::future::pending::<()>())
1280 .await;
1281
1282 assert_eq!(attempts.load(Ordering::SeqCst), 1);
1283 let Err(error) = result else {
1284 return Err(WorkerError::decode(UnexpectedSuccess));
1285 };
1286 assert!(!error.is_retryable());
1287 assert!(matches!(
1288 error.grpc_status().map(tonic::Status::code),
1289 Some(tonic::Code::PermissionDenied)
1290 ));
1291 assert_eq!(
1292 error.grpc_status().map(tonic::Status::message),
1293 Some("namespace `payments` revoked for subject `worker-a`")
1294 );
1295 drop(log_receiver);
1296 Ok(())
1297 }
1298
1299 #[tokio::test]
1300 async fn shutdown_during_establishment_backoff_returns_promptly() -> Result<(), WorkerError> {
1301 let worker = two_activity_worker_with(slow_reconnect_config())?;
1302 let attempts = Arc::new(AtomicUsize::new(0));
1303 let notify = Arc::new(Notify::new());
1304 let connect = {
1305 let attempts = Arc::clone(&attempts);
1306 let notify = Arc::clone(¬ify);
1307 move || {
1308 attempts.fetch_add(1, Ordering::SeqCst);
1309 notify.notify_one();
1310 async move {
1311 Err::<ScriptedSession, _>(WorkerError::Transport {
1312 source: tonic::Status::unavailable("engine unreachable"),
1313 })
1314 }
1315 }
1316 };
1317 let shutdown = {
1318 let notify = Arc::clone(¬ify);
1319 async move {
1320 notify.notified().await;
1321 }
1322 };
1323
1324 let run = worker.run_with_connector_until(connect, shutdown);
1325 tokio::time::timeout(Duration::from_millis(500), run)
1326 .await
1327 .map_err(WorkerError::decode)??;
1328
1329 assert_eq!(attempts.load(Ordering::SeqCst), 1);
1330 Ok(())
1331 }
1332
1333 #[tokio::test]
1334 async fn shutdown_during_mid_run_drop_backoff_returns_promptly() -> Result<(), WorkerError> {
1335 let worker = two_activity_worker_with(slow_reconnect_config())?;
1336 let attempts = Arc::new(AtomicUsize::new(0));
1337 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1338 let connect = {
1339 let attempts = Arc::clone(&attempts);
1340 move || {
1341 attempts.fetch_add(1, Ordering::SeqCst);
1342 let log = log_sender.clone();
1343 async move {
1344 Ok(ScriptedSession {
1345 index: 1,
1346 log,
1347 events: vec![Err(WorkerError::Transport {
1348 source: tonic::Status::unavailable("stream reset by peer"),
1349 })],
1350 fail_reports: false,
1351 register_denial: None,
1352 delay_stream: None,
1353 })
1354 }
1355 }
1356 };
1357 let shutdown = async {
1358 tokio::time::sleep(Duration::from_millis(50)).await;
1359 };
1360
1361 let run = worker.run_with_connector_until(connect, shutdown);
1362 let result = tokio::time::timeout(Duration::from_millis(500), run)
1363 .await
1364 .map_err(WorkerError::decode)?;
1365
1366 assert_eq!(attempts.load(Ordering::SeqCst), 1);
1367 let Err(error) = result else {
1368 return Err(WorkerError::decode(UnexpectedSuccess));
1369 };
1370 assert!(error.is_retryable());
1371 assert!(matches!(
1372 error.grpc_status().map(tonic::Status::code),
1373 Some(tonic::Code::Unavailable)
1374 ));
1375 drop(log_receiver);
1376 Ok(())
1377 }
1378
1379 #[tokio::test]
1380 async fn served_tasks_reset_drop_budget_across_cycles() -> Result<(), WorkerError> {
1381 let workflow_id = WorkflowId::new_v4();
1382 let activity_id = ActivityId::from_sequence_position(7);
1383 let worker = two_activity_worker_with(test_config_with(ReconnectConfig::new(
1386 Duration::from_millis(1),
1387 Duration::from_secs(3600),
1388 2,
1389 )))?;
1390 let attempts = Arc::new(AtomicUsize::new(0));
1391 let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
1392 let connect = {
1393 let attempts = Arc::clone(&attempts);
1394 let log_sender = log_sender.clone();
1395 let workflow_id = workflow_id.clone();
1396 let activity_id = activity_id.clone();
1397 move || {
1398 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1399 let log = log_sender.clone();
1400 let task = proto_task(workflow_id.clone(), activity_id.clone(), "double", 21);
1401 async move {
1402 if attempt <= 4 {
1403 Ok(ScriptedSession {
1404 index: attempt,
1405 log,
1406 events: vec![
1407 Ok(WorkerSessionEvent::Task(task)),
1408 Err(WorkerError::Transport {
1409 source: tonic::Status::unavailable("stream reset by peer"),
1410 }),
1411 ],
1412 fail_reports: false,
1413 register_denial: None,
1414 delay_stream: None,
1415 })
1416 } else {
1417 Ok(ScriptedSession {
1418 index: attempt,
1419 log,
1420 events: Vec::new(),
1421 fail_reports: false,
1422 register_denial: Some(tonic::Status::permission_denied(
1423 "namespace `payments` revoked for subject `worker-a`",
1424 )),
1425 delay_stream: None,
1426 })
1427 }
1428 }
1429 }
1430 };
1431
1432 let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1433 let result = tokio::time::timeout(Duration::from_secs(5), run)
1434 .await
1435 .map_err(WorkerError::decode)?;
1436
1437 drop(log_sender);
1438 let mut registrations = 0_usize;
1439 while let Some(entry) = log_receiver.recv().await {
1440 if let SessionLog::Registered(..) = entry {
1441 registrations += 1;
1442 }
1443 }
1444 assert_eq!(attempts.load(Ordering::SeqCst), 5);
1449 assert_eq!(registrations, 4);
1450 let Err(error) = result else {
1451 return Err(WorkerError::decode(UnexpectedSuccess));
1452 };
1453 assert!(!error.is_retryable());
1454 assert!(matches!(
1455 error.grpc_status().map(tonic::Status::code),
1456 Some(tonic::Code::PermissionDenied)
1457 ));
1458 Ok(())
1459 }
1460
1461 #[tokio::test(start_paused = true)]
1462 async fn session_outliving_max_backoff_resets_drop_budget() -> Result<(), WorkerError> {
1463 let worker = two_activity_worker_with(test_config_with(ReconnectConfig::new(
1464 Duration::from_millis(5),
1465 Duration::from_millis(20),
1466 2,
1467 )))?;
1468 let attempts = Arc::new(AtomicUsize::new(0));
1469 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1470 let connect = {
1471 let attempts = Arc::clone(&attempts);
1472 move || {
1473 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1474 let log = log_sender.clone();
1475 async move {
1476 Ok(ScriptedSession {
1477 index: attempt,
1478 log,
1479 events: vec![Err(WorkerError::Transport {
1480 source: tonic::Status::unavailable("stream reset by peer"),
1481 })],
1482 fail_reports: false,
1483 register_denial: None,
1484 delay_stream: (attempt == 2).then_some(Duration::from_millis(30)),
1488 })
1489 }
1490 }
1491 };
1492
1493 let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1494 let result = tokio::time::timeout(Duration::from_secs(5), run)
1495 .await
1496 .map_err(WorkerError::decode)?;
1497
1498 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1505 let Err(error) = result else {
1506 return Err(WorkerError::decode(UnexpectedSuccess));
1507 };
1508 assert!(error.is_retryable());
1509 assert!(matches!(
1510 error.grpc_status().map(tonic::Status::code),
1511 Some(tonic::Code::Unavailable)
1512 ));
1513 drop(log_receiver);
1514 Ok(())
1515 }
1516
1517 #[tokio::test(start_paused = true)]
1524 async fn post_drop_drain_time_does_not_reset_drop_budget() -> Result<(), WorkerError> {
1525 let workflow_id = WorkflowId::new_v4();
1526 let activity_id = ActivityId::from_sequence_position(9);
1527 let config = WorkerConfig::new(
1530 "http://127.0.0.1:50051",
1531 "payments",
1532 "worker-a",
1533 2,
1534 ReconnectConfig::new(Duration::from_millis(5), Duration::from_millis(20), 2),
1535 None,
1536 );
1537 let worker = Worker::builder(config)
1538 .register_activity("slow", |input: TestInput, context: &ActivityContext| {
1539 let _ = (input, context);
1540 Box::pin(async move {
1541 tokio::time::sleep(Duration::from_millis(60)).await;
1544 Ok(TestOutput { value: 1 })
1545 })
1546 })?
1547 .build()?;
1548 let attempts = Arc::new(AtomicUsize::new(0));
1549 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1550 let connect = {
1551 let attempts = Arc::clone(&attempts);
1552 let workflow_id = workflow_id.clone();
1553 let activity_id = activity_id.clone();
1554 move || {
1555 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1556 let log = log_sender.clone();
1557 let task = proto_task(workflow_id.clone(), activity_id.clone(), "slow", 1);
1558 async move {
1559 if attempt == 1 {
1560 Ok(ScriptedSession {
1564 index: 1,
1565 log,
1566 events: vec![Err(WorkerError::Transport {
1567 source: tonic::Status::unavailable("stream reset by peer"),
1568 })],
1569 fail_reports: false,
1570 register_denial: None,
1571 delay_stream: None,
1572 })
1573 } else {
1574 Ok(ScriptedSession {
1579 index: attempt,
1580 log,
1581 events: vec![
1582 Ok(WorkerSessionEvent::Task(task)),
1583 Err(WorkerError::Transport {
1584 source: tonic::Status::unavailable("stream reset by peer"),
1585 }),
1586 ],
1587 fail_reports: true,
1588 register_denial: None,
1589 delay_stream: None,
1590 })
1591 }
1592 }
1593 }
1594 };
1595
1596 let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1597 let result = tokio::time::timeout(Duration::from_secs(5), run)
1598 .await
1599 .map_err(WorkerError::decode)?;
1600
1601 assert_eq!(attempts.load(Ordering::SeqCst), 2);
1608 let Err(error) = result else {
1609 return Err(WorkerError::decode(UnexpectedSuccess));
1610 };
1611 assert!(error.is_retryable());
1612 assert!(matches!(
1613 error.grpc_status().map(tonic::Status::code),
1614 Some(tonic::Code::Unavailable)
1615 ));
1616 drop(log_receiver);
1617 Ok(())
1618 }
1619
1620 #[tokio::test]
1621 async fn clean_close_reconnects_re_registers_and_keeps_serving() -> Result<(), WorkerError> {
1622 let workflow_id = WorkflowId::new_v4();
1623 let first_activity = ActivityId::from_sequence_position(1);
1624 let second_activity = ActivityId::from_sequence_position(2);
1625 let worker = two_activity_worker()?;
1626 let attempts = Arc::new(AtomicUsize::new(0));
1627 let (log_sender, mut log_receiver) = mpsc::unbounded_channel();
1628 let connect = {
1629 let attempts = Arc::clone(&attempts);
1630 let log_sender = log_sender.clone();
1631 let workflow_id = workflow_id.clone();
1632 let first_activity = first_activity.clone();
1633 let second_activity = second_activity.clone();
1634 move || {
1635 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1636 let log = log_sender.clone();
1637 let first_task =
1638 proto_task(workflow_id.clone(), first_activity.clone(), "double", 10);
1639 let second_task =
1640 proto_task(workflow_id.clone(), second_activity.clone(), "double", 20);
1641 async move {
1642 match attempt {
1643 1 => Ok(ScriptedSession {
1646 index: 1,
1647 log,
1648 events: vec![Ok(WorkerSessionEvent::Task(first_task))],
1649 fail_reports: false,
1650 register_denial: None,
1651 delay_stream: None,
1652 }),
1653 2 => Ok(ScriptedSession {
1654 index: 2,
1655 log,
1656 events: vec![Ok(WorkerSessionEvent::Task(second_task))],
1657 fail_reports: false,
1658 register_denial: None,
1659 delay_stream: None,
1660 }),
1661 _ => Ok(ScriptedSession {
1662 index: attempt,
1663 log,
1664 events: Vec::new(),
1665 fail_reports: false,
1666 register_denial: Some(tonic::Status::permission_denied(
1667 "namespace `payments` revoked for subject `worker-a`",
1668 )),
1669 delay_stream: None,
1670 }),
1671 }
1672 }
1673 }
1674 };
1675
1676 let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1677 let result = tokio::time::timeout(Duration::from_secs(5), run)
1678 .await
1679 .map_err(WorkerError::decode)?;
1680
1681 drop(log_sender);
1682 let mut registrations = Vec::new();
1683 let mut reports = Vec::new();
1684 while let Some(entry) = log_receiver.recv().await {
1685 match entry {
1686 SessionLog::Registered(index, types) => registrations.push((index, types)),
1687 SessionLog::Reported(index, report) => reports.push((index, report)),
1688 }
1689 }
1690 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1694 let expected_types = vec![String::from("double"), String::from("increment")];
1695 assert_eq!(
1696 registrations,
1697 vec![(1, expected_types.clone()), (2, expected_types)]
1698 );
1699 assert_eq!(reports.len(), 3);
1700 assert!(matches!(
1701 &reports[0],
1702 (1, RecordedReport::Completed(_, id, _)) if id == &first_activity
1703 ));
1704 assert!(matches!(
1705 &reports[1],
1706 (2, RecordedReport::Completed(_, id, _)) if id == &first_activity
1707 ));
1708 assert!(matches!(
1709 &reports[2],
1710 (2, RecordedReport::Completed(_, id, _)) if id == &second_activity
1711 ));
1712 let Err(error) = result else {
1713 return Err(WorkerError::decode(UnexpectedSuccess));
1714 };
1715 assert!(!error.is_retryable());
1716 assert!(matches!(
1717 error.grpc_status().map(tonic::Status::code),
1718 Some(tonic::Code::PermissionDenied)
1719 ));
1720 Ok(())
1721 }
1722
1723 #[tokio::test(start_paused = true)]
1724 async fn clean_close_loop_exhausts_drop_budget_with_classified_error() -> Result<(), WorkerError>
1725 {
1726 let worker = two_activity_worker()?;
1727 let attempts = Arc::new(AtomicUsize::new(0));
1728 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1729 let connect = {
1730 let attempts = Arc::clone(&attempts);
1731 move || {
1732 let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
1733 let log = log_sender.clone();
1734 async move {
1735 Ok(ScriptedSession {
1736 index: attempt,
1737 log,
1738 events: Vec::new(),
1739 fail_reports: false,
1740 register_denial: None,
1741 delay_stream: None,
1742 })
1743 }
1744 }
1745 };
1746
1747 let run = worker.run_with_connector_until(connect, std::future::pending::<()>());
1748 let result = tokio::time::timeout(Duration::from_secs(5), run)
1749 .await
1750 .map_err(WorkerError::decode)?;
1751
1752 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1757 let Err(error) = result else {
1758 return Err(WorkerError::decode(UnexpectedSuccess));
1759 };
1760 assert!(matches!(error, WorkerError::CleanCloseExhausted));
1761 assert!(error.to_string().contains("closed the stream cleanly"));
1762 drop(log_receiver);
1763 Ok(())
1764 }
1765
1766 #[tokio::test]
1767 async fn shutdown_during_clean_close_backoff_returns_ok_promptly() -> Result<(), WorkerError> {
1768 let worker = two_activity_worker_with(slow_reconnect_config())?;
1769 let attempts = Arc::new(AtomicUsize::new(0));
1770 let (log_sender, log_receiver) = mpsc::unbounded_channel();
1771 let connect = {
1772 let attempts = Arc::clone(&attempts);
1773 move || {
1774 attempts.fetch_add(1, Ordering::SeqCst);
1775 let log = log_sender.clone();
1776 async move {
1777 Ok(ScriptedSession {
1778 index: 1,
1779 log,
1780 events: Vec::new(),
1781 fail_reports: false,
1782 register_denial: None,
1783 delay_stream: None,
1784 })
1785 }
1786 }
1787 };
1788 let shutdown = async {
1789 tokio::time::sleep(Duration::from_millis(50)).await;
1790 };
1791
1792 let run = worker.run_with_connector_until(connect, shutdown);
1795 tokio::time::timeout(Duration::from_millis(500), run)
1796 .await
1797 .map_err(WorkerError::decode)??;
1798
1799 assert_eq!(attempts.load(Ordering::SeqCst), 1);
1800 drop(log_receiver);
1801 Ok(())
1802 }
1803}