Skip to main content

aion_client/
stream.rs

1//! Event subscription `Stream` and resumption.
2
3use std::num::NonZeroU64;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use aion_core::{Event, WorkflowFilter, WorkflowId};
9use aion_proto::{
10    FilteredSubscription, FirehoseSubscription, PerWorkflowSubscription, ProtoWorkflowId,
11    SubscriptionRequest, subscription_request,
12};
13use futures::Stream;
14use futures::future::BoxFuture;
15use futures::stream::BoxStream;
16
17use crate::error::ClientError;
18use crate::transport::{SubscriptionAttempt, WorkflowTransport};
19
20/// Boxed event stream returned by subscribe operations.
21pub type EventStream = Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>;
22
23/// Builder for the AW-owned subscription variants supported by the SDK.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum SubscribeTarget {
26    /// Subscribe to events for one workflow.
27    Workflow {
28        /// Workflow whose events are requested.
29        workflow_id: WorkflowId,
30    },
31    /// Subscribe to workflow metadata selected events.
32    Filtered {
33        /// Workflow metadata filter used for the subscription.
34        filter: WorkflowFilter,
35    },
36    /// Subscribe to all visible events in the client's namespace.
37    Firehose,
38}
39
40impl SubscribeTarget {
41    pub(crate) fn request(&self, namespace: &str) -> SubscriptionRequest {
42        match self {
43            Self::Workflow { workflow_id } => SubscriptionRequest {
44                subscription: Some(subscription_request::Subscription::PerWorkflow(
45                    PerWorkflowSubscription {
46                        namespace: namespace.to_owned(),
47                        workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
48                        resume_from_seq: None,
49                    },
50                )),
51            },
52            Self::Filtered { filter } => SubscriptionRequest {
53                subscription: Some(subscription_request::Subscription::Filtered(
54                    FilteredSubscription {
55                        namespace: namespace.to_owned(),
56                        workflow_type: filter.workflow_type.clone(),
57                        status: filter
58                            .status
59                            .map(|status| aion_proto::ProtoWorkflowStatus::from(status) as i32),
60                        namespace_selector: None,
61                    },
62                )),
63            },
64            Self::Firehose => SubscriptionRequest {
65                subscription: Some(subscription_request::Subscription::Firehose(
66                    FirehoseSubscription {
67                        namespace: namespace.to_owned(),
68                    },
69                )),
70            },
71        }
72    }
73}
74
75/// Reconnecting subscription stream.
76///
77/// Resumption is per-workflow only: per-workflow `seq` is the only ordering
78/// that exists, so only [`SubscribeTarget::Workflow`] streams track a cursor
79/// (`resume_from_seq = last delivered + 1`) and deduplicate by sequence
80/// number. Filtered and firehose streams are live-only by design: a
81/// transient disconnect after at least one delivered event ends the stream
82/// with an honest [`ClientError::Unavailable`] instead of silently
83/// reattaching a gapped stream; reconnect-live-only is allowed only while
84/// nothing has been delivered yet.
85///
86/// Connect-failure contract (cross-SDK): a failed subscription attach is
87/// classified exactly like a mid-stream drop. [`ClientError::Unavailable`]
88/// (transport-level connect failure, DNS/TLS/socket failure, abnormal close)
89/// is retryable and the stream re-attaches — on the initial attach as well as
90/// after delivered events — until the caller drops the stream; every other
91/// taxonomy error (`Unauthenticated`, `NamespaceDenied`, `NotFound`,
92/// `InvalidArgument`, `Server`, ...) is terminal immediately.
93pub struct ResumingEventStream {
94    transport: Arc<dyn WorkflowTransport>,
95    namespace: String,
96    target: SubscribeTarget,
97    last_seq: Option<u64>,
98    delivered_any: bool,
99    current: Option<BoxStream<'static, Result<Event, ClientError>>>,
100    pending_subscribe: Option<BoxFuture<'static, Result<SubscriptionAttempt, ClientError>>>,
101    terminal_error: Option<ClientError>,
102    finished: bool,
103}
104
105impl ResumingEventStream {
106    /// Creates a subscription stream for `target`.
107    #[must_use]
108    pub fn new(
109        transport: Arc<dyn WorkflowTransport>,
110        namespace: impl Into<String>,
111        target: SubscribeTarget,
112    ) -> Self {
113        Self {
114            transport,
115            namespace: namespace.into(),
116            target,
117            last_seq: None,
118            delivered_any: false,
119            current: None,
120            pending_subscribe: None,
121            terminal_error: None,
122            finished: false,
123        }
124    }
125
126    /// Creates a per-workflow subscription stream that attaches with an
127    /// explicit starting cursor.
128    ///
129    /// `resume_from` is the first per-workflow sequence number wanted
130    /// (`resume_from_seq` on the wire); `1` replays the full recorded
131    /// history before splicing into the live stream. The type makes the
132    /// invalid cursor `0` unrepresentable.
133    #[must_use]
134    pub fn from_sequence(
135        transport: Arc<dyn WorkflowTransport>,
136        namespace: impl Into<String>,
137        workflow_id: WorkflowId,
138        resume_from: NonZeroU64,
139    ) -> Self {
140        let mut stream = Self::new(
141            transport,
142            namespace,
143            SubscribeTarget::Workflow { workflow_id },
144        );
145        // The cursor sent on (re)attach is always `last_seq + 1`, so seeding
146        // `last_seq = resume_from - 1` makes the first attach request exactly
147        // `resume_from` and drops anything older on the dedupe path.
148        stream.last_seq = Some(resume_from.get() - 1);
149        stream
150    }
151
152    fn is_per_workflow(&self) -> bool {
153        matches!(self.target, SubscribeTarget::Workflow { .. })
154    }
155
156    fn start_subscribe(&mut self) {
157        let transport = Arc::clone(&self.transport);
158        let request = self.target.request(&self.namespace);
159        // Only per-workflow streams carry a resume cursor; filtered and
160        // firehose reattach live-only (and only before any delivery).
161        let resume_from_sequence = if self.is_per_workflow() {
162            self.last_seq.map(|seq| seq.saturating_add(1))
163        } else {
164            None
165        };
166        self.pending_subscribe = Some(Box::pin(async move {
167            transport.subscribe(request, resume_from_sequence).await
168        }));
169    }
170}
171
172impl Stream for ResumingEventStream {
173    type Item = Result<Event, ClientError>;
174
175    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176        let this = self.get_mut();
177        loop {
178            if this.finished {
179                return Poll::Ready(None);
180            }
181
182            if let Some(error) = this.terminal_error.take() {
183                this.finished = true;
184                return Poll::Ready(Some(Err(error)));
185            }
186
187            if this.current.is_none() && this.pending_subscribe.is_none() {
188                this.start_subscribe();
189            }
190
191            if let Some(pending) = this.pending_subscribe.as_mut() {
192                match pending.as_mut().poll(cx) {
193                    Poll::Pending => return Poll::Pending,
194                    Poll::Ready(Ok(attempt)) => {
195                        this.pending_subscribe = None;
196                        this.current = Some(attempt.events);
197                    }
198                    Poll::Ready(Err(error)) => {
199                        // Cross-SDK connect-failure contract: an attach
200                        // failure is classified exactly like a mid-stream
201                        // drop. `Unavailable` is retryable — per-workflow
202                        // streams reconnect with their cursor, live-only
203                        // streams reconnect only while nothing has been
204                        // delivered. Every other taxonomy error is terminal.
205                        this.pending_subscribe = None;
206                        if is_retryable(&error) && (this.is_per_workflow() || !this.delivered_any) {
207                            continue;
208                        }
209                        this.finished = true;
210                        return Poll::Ready(Some(Err(error)));
211                    }
212                }
213            }
214
215            let Some(current) = this.current.as_mut() else {
216                continue;
217            };
218            match current.as_mut().poll_next(cx) {
219                Poll::Pending => return Poll::Pending,
220                Poll::Ready(Some(Ok(event))) => {
221                    if this.is_per_workflow() {
222                        // Sequence-number dedupe is coherent only within one
223                        // workflow's history.
224                        if this.last_seq.is_some_and(|seq| event.seq() <= seq) {
225                            continue;
226                        }
227                        this.last_seq = Some(event.seq());
228                    }
229                    this.delivered_any = true;
230                    return Poll::Ready(Some(Ok(event)));
231                }
232                Poll::Ready(Some(Err(error))) => {
233                    this.current = None;
234                    if is_retryable(&error) {
235                        if this.is_per_workflow() {
236                            continue;
237                        }
238                        if !this.delivered_any {
239                            // Nothing delivered yet: a live-only reattach
240                            // cannot gap, so reconnect.
241                            continue;
242                        }
243                        // Filtered/firehose streams have no resume cursor; a
244                        // reattach after delivered events would silently gap.
245                        // Surface an honest terminal Unavailable instead.
246                    }
247                    this.terminal_error = Some(error);
248                }
249                Poll::Ready(None) => {
250                    this.current = None;
251                    this.finished = true;
252                    return Poll::Ready(None);
253                }
254            }
255        }
256    }
257}
258
259/// Boxes a resuming event stream behind the public return type.
260#[must_use]
261pub fn event_stream(
262    transport: Arc<dyn WorkflowTransport>,
263    namespace: impl Into<String>,
264    target: SubscribeTarget,
265) -> EventStream {
266    Box::pin(ResumingEventStream::new(transport, namespace, target))
267}
268
269/// Boxes a per-workflow stream attaching with an explicit starting cursor.
270#[must_use]
271pub fn event_stream_from(
272    transport: Arc<dyn WorkflowTransport>,
273    namespace: impl Into<String>,
274    workflow_id: WorkflowId,
275    resume_from: NonZeroU64,
276) -> EventStream {
277    Box::pin(ResumingEventStream::from_sequence(
278        transport,
279        namespace,
280        workflow_id,
281        resume_from,
282    ))
283}
284
285fn is_retryable(error: &ClientError) -> bool {
286    matches!(error, ClientError::Unavailable { .. })
287}
288
289#[cfg(test)]
290mod tests {
291    use std::collections::VecDeque;
292    use std::sync::Arc;
293
294    use aion_core::{ContentType, Event, EventEnvelope, Payload, WorkflowId};
295    use aion_proto::{
296        ProtoCancelResponse, ProtoDescribeWorkflowResponse, ProtoListWorkflowsResponse,
297        ProtoQueryResponse, ProtoSignalResponse, ProtoStartWorkflowResponse,
298    };
299    use async_trait::async_trait;
300    use chrono::Utc;
301    use futures::StreamExt;
302    use futures::stream;
303    use tokio::sync::Mutex;
304
305    use super::{ResumingEventStream, SubscribeTarget};
306    use crate::error::ClientError;
307    use crate::transport::{SubscriptionAttempt, WorkflowTransport};
308
309    #[derive(Default)]
310    struct SubscribeStub {
311        /// Attach failures consumed before any queued attempt: each entry is
312        /// one subscribe call that fails before a stream exists.
313        attach_failures: Mutex<VecDeque<ClientError>>,
314        attempts: Mutex<VecDeque<SubscriptionAttempt>>,
315        resume_points: Mutex<Vec<Option<u64>>>,
316    }
317
318    #[async_trait]
319    impl WorkflowTransport for SubscribeStub {
320        async fn start_workflow(
321            &self,
322            _: aion_proto::ProtoStartWorkflowRequest,
323        ) -> Result<ProtoStartWorkflowResponse, ClientError> {
324            Err(ClientError::unavailable("stub transport"))
325        }
326
327        async fn signal(
328            &self,
329            _: aion_proto::ProtoSignalRequest,
330        ) -> Result<ProtoSignalResponse, ClientError> {
331            Err(ClientError::unavailable("stub transport"))
332        }
333
334        async fn query(
335            &self,
336            _: aion_proto::ProtoQueryRequest,
337        ) -> Result<ProtoQueryResponse, ClientError> {
338            Err(ClientError::unavailable("stub transport"))
339        }
340
341        async fn cancel(
342            &self,
343            _: aion_proto::ProtoCancelRequest,
344        ) -> Result<ProtoCancelResponse, ClientError> {
345            Err(ClientError::unavailable("stub transport"))
346        }
347
348        async fn list_workflows(
349            &self,
350            _: aion_proto::ProtoListWorkflowsRequest,
351        ) -> Result<ProtoListWorkflowsResponse, ClientError> {
352            Err(ClientError::unavailable("stub transport"))
353        }
354
355        async fn describe_workflow(
356            &self,
357            _: aion_proto::ProtoDescribeWorkflowRequest,
358        ) -> Result<ProtoDescribeWorkflowResponse, ClientError> {
359            Err(ClientError::unavailable("stub transport"))
360        }
361
362        async fn subscribe(
363            &self,
364            _: aion_proto::SubscriptionRequest,
365            resume_from_sequence: Option<u64>,
366        ) -> Result<SubscriptionAttempt, ClientError> {
367            self.resume_points.lock().await.push(resume_from_sequence);
368            if let Some(failure) = self.attach_failures.lock().await.pop_front() {
369                return Err(failure);
370            }
371            self.attempts
372                .lock()
373                .await
374                .pop_front()
375                .ok_or_else(|| ClientError::server("missing subscribe attempt"))
376        }
377    }
378
379    fn event(seq: u64, workflow_id: &WorkflowId) -> Event {
380        Event::WorkflowStarted {
381            envelope: EventEnvelope {
382                seq,
383                recorded_at: Utc::now(),
384                workflow_id: workflow_id.clone(),
385            },
386            workflow_type: String::from("checkout"),
387            input: Payload::new(ContentType::Json, Vec::new()),
388            run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
389            parent_run_id: None,
390            package_version: aion_core::PackageVersion::new("a".repeat(64)),
391        }
392    }
393
394    #[tokio::test]
395    async fn resumes_after_transient_disconnect_without_gaps_or_duplicates() {
396        let workflow_id = WorkflowId::new_v4();
397        let stub = Arc::new(SubscribeStub::default());
398        stub.attempts
399            .lock()
400            .await
401            .push_back(SubscriptionAttempt::new(
402                stream::iter(vec![
403                    Ok(event(1, &workflow_id)),
404                    Ok(event(2, &workflow_id)),
405                    Err(ClientError::unavailable("transient disconnect")),
406                ])
407                .boxed(),
408            ));
409        stub.attempts
410            .lock()
411            .await
412            .push_back(SubscriptionAttempt::new(
413                stream::iter(vec![
414                    Ok(event(2, &workflow_id)),
415                    Ok(event(3, &workflow_id)),
416                    Ok(event(4, &workflow_id)),
417                ])
418                .boxed(),
419            ));
420        let mut events = ResumingEventStream::new(
421            stub.clone(),
422            "tenant-a",
423            SubscribeTarget::Workflow {
424                workflow_id: workflow_id.clone(),
425            },
426        );
427
428        let mut seqs = Vec::new();
429        while let Some(item) = events.next().await {
430            let event = item
431                .map_err(|e| format!("unexpected stream error: {e}"))
432                .ok();
433            if let Some(event) = event {
434                seqs.push(event.seq());
435            }
436        }
437
438        assert_eq!(seqs, vec![1, 2, 3, 4]);
439        assert_eq!(*stub.resume_points.lock().await, vec![None, Some(3)]);
440    }
441
442    #[tokio::test]
443    async fn terminal_failure_is_yielded_before_end() {
444        let workflow_id = WorkflowId::new_v4();
445        let stub = Arc::new(SubscribeStub::default());
446        stub.attempts
447            .lock()
448            .await
449            .push_back(SubscriptionAttempt::new(
450                stream::iter(vec![Err(ClientError::unauthenticated("bad token"))]).boxed(),
451            ));
452        let mut events =
453            ResumingEventStream::new(stub, "tenant-a", SubscribeTarget::Workflow { workflow_id });
454
455        assert_eq!(
456            events.next().await,
457            Some(Err(ClientError::unauthenticated("bad token")))
458        );
459        assert_eq!(events.next().await, None);
460    }
461
462    #[tokio::test]
463    async fn namespace_denied_is_terminal_and_never_retried() {
464        let workflow_id = WorkflowId::new_v4();
465        let stub = Arc::new(SubscribeStub::default());
466        let denied =
467            ClientError::namespace_denied("namespace tenant-b is not granted to this caller");
468        stub.attempts
469            .lock()
470            .await
471            .push_back(SubscriptionAttempt::new(
472                stream::iter(vec![Err(denied.clone())]).boxed(),
473            ));
474        let mut events = ResumingEventStream::new(
475            stub.clone(),
476            "tenant-b",
477            SubscribeTarget::Workflow { workflow_id },
478        );
479
480        assert_eq!(events.next().await, Some(Err(denied)));
481        assert_eq!(events.next().await, None);
482        assert_eq!(stub.resume_points.lock().await.len(), 1);
483    }
484
485    #[tokio::test]
486    async fn from_sequence_passes_the_cursor_on_the_initial_attach() {
487        let workflow_id = WorkflowId::new_v4();
488        let stub = Arc::new(SubscribeStub::default());
489        stub.attempts
490            .lock()
491            .await
492            .push_back(SubscriptionAttempt::new(
493                stream::iter(vec![Ok(event(1, &workflow_id)), Ok(event(2, &workflow_id))]).boxed(),
494            ));
495        let Some(resume_from) = std::num::NonZeroU64::new(1) else {
496            unreachable!("1 is non-zero");
497        };
498        let mut events = super::ResumingEventStream::from_sequence(
499            stub.clone(),
500            "tenant-a",
501            workflow_id,
502            resume_from,
503        );
504
505        let mut seqs = Vec::new();
506        while let Some(item) = events.next().await {
507            if let Ok(event) = item {
508                seqs.push(event.seq());
509            }
510        }
511
512        assert_eq!(seqs, vec![1, 2]);
513        assert_eq!(
514            *stub.resume_points.lock().await,
515            vec![Some(1)],
516            "the initial attach must carry the explicit cursor"
517        );
518    }
519
520    #[tokio::test]
521    async fn live_only_streams_reconnect_only_before_any_delivery() {
522        // A filtered stream that drops before delivering anything may
523        // reattach live-only — nothing can gap yet — and never with a cursor.
524        let workflow_id = WorkflowId::new_v4();
525        let stub = Arc::new(SubscribeStub::default());
526        stub.attempts
527            .lock()
528            .await
529            .push_back(SubscriptionAttempt::new(
530                stream::iter(vec![Err(ClientError::unavailable("transient disconnect"))]).boxed(),
531            ));
532        stub.attempts
533            .lock()
534            .await
535            .push_back(SubscriptionAttempt::new(
536                stream::iter(vec![Ok(event(1, &workflow_id))]).boxed(),
537            ));
538        let mut events = ResumingEventStream::new(
539            stub.clone(),
540            "tenant-a",
541            SubscribeTarget::Filtered {
542                filter: aion_core::WorkflowFilter::default(),
543            },
544        );
545
546        let mut seqs = Vec::new();
547        while let Some(item) = events.next().await {
548            if let Ok(event) = item {
549                seqs.push(event.seq());
550            }
551        }
552
553        assert_eq!(seqs, vec![1]);
554        assert_eq!(
555            *stub.resume_points.lock().await,
556            vec![None, None],
557            "live-only streams never carry a resume cursor"
558        );
559    }
560
561    #[tokio::test]
562    async fn live_only_disconnect_after_delivery_is_honest_unavailable() {
563        // Filtered/firehose streams have no resume cursor: a transient drop
564        // after >= 1 delivered event must surface Unavailable, never a silent
565        // gapped reattach.
566        for target in [
567            SubscribeTarget::Filtered {
568                filter: aion_core::WorkflowFilter::default(),
569            },
570            SubscribeTarget::Firehose,
571        ] {
572            let workflow_id = WorkflowId::new_v4();
573            let stub = Arc::new(SubscribeStub::default());
574            stub.attempts
575                .lock()
576                .await
577                .push_back(SubscriptionAttempt::new(
578                    stream::iter(vec![
579                        Ok(event(1, &workflow_id)),
580                        Err(ClientError::unavailable("transient disconnect")),
581                    ])
582                    .boxed(),
583                ));
584            let mut events = ResumingEventStream::new(stub.clone(), "tenant-a", target);
585
586            let first = events.next().await;
587            assert!(matches!(first, Some(Ok(_))), "got {first:?}");
588            assert_eq!(
589                events.next().await,
590                Some(Err(ClientError::unavailable("transient disconnect")))
591            );
592            assert_eq!(events.next().await, None);
593            assert_eq!(
594                stub.resume_points.lock().await.len(),
595                1,
596                "no reattach may follow a post-delivery live-only disconnect"
597            );
598        }
599    }
600
601    #[tokio::test]
602    async fn live_only_streams_do_not_dedupe_sequence_numbers_across_workflows() {
603        // Per-workflow seq is the only ordering that exists; two workflows
604        // legitimately share sequence numbers on a filtered/firehose stream.
605        let first_workflow = WorkflowId::new_v4();
606        let second_workflow = WorkflowId::new_v4();
607        let stub = Arc::new(SubscribeStub::default());
608        stub.attempts
609            .lock()
610            .await
611            .push_back(SubscriptionAttempt::new(
612                stream::iter(vec![
613                    Ok(event(1, &first_workflow)),
614                    Ok(event(1, &second_workflow)),
615                ])
616                .boxed(),
617            ));
618        let mut events = ResumingEventStream::new(stub, "tenant-a", SubscribeTarget::Firehose);
619
620        let mut delivered = Vec::new();
621        while let Some(item) = events.next().await {
622            if let Ok(event) = item {
623                delivered.push(event.envelope().workflow_id.clone());
624            }
625        }
626
627        assert_eq!(delivered, vec![first_workflow, second_workflow]);
628    }
629
630    #[tokio::test]
631    async fn not_found_is_terminal_and_never_retried() {
632        // A workflow-level visibility miss surfaces as NotFound (the server's
633        // anti-existence-leak contract); like every non-Unavailable error it
634        // must end the stream instead of reconnecting forever.
635        let workflow_id = WorkflowId::new_v4();
636        let stub = Arc::new(SubscribeStub::default());
637        stub.attempts
638            .lock()
639            .await
640            .push_back(SubscriptionAttempt::new(
641                stream::iter(vec![Err(ClientError::not_found("workflow was not found"))]).boxed(),
642            ));
643        let mut events = ResumingEventStream::new(
644            stub.clone(),
645            "tenant-a",
646            SubscribeTarget::Workflow { workflow_id },
647        );
648
649        assert_eq!(
650            events.next().await,
651            Some(Err(ClientError::not_found("workflow was not found")))
652        );
653        assert_eq!(events.next().await, None);
654        assert_eq!(stub.resume_points.lock().await.len(), 1);
655    }
656
657    /// Connect-failure contract: an `Unavailable` initial attach failure is
658    /// retryable exactly like a mid-stream drop — the stream re-attaches and
659    /// delivers, never surfacing the transient error as terminal.
660    #[tokio::test]
661    async fn unavailable_attach_failure_is_retried_until_attach_succeeds() -> Result<(), ClientError>
662    {
663        let workflow_id = WorkflowId::new_v4();
664        let stub = Arc::new(SubscribeStub::default());
665        stub.attach_failures
666            .lock()
667            .await
668            .push_back(ClientError::unavailable("connection refused"));
669        stub.attach_failures
670            .lock()
671            .await
672            .push_back(ClientError::unavailable("connection refused"));
673        stub.attempts
674            .lock()
675            .await
676            .push_back(SubscriptionAttempt::new(
677                stream::iter(vec![Ok(event(1, &workflow_id)), Ok(event(2, &workflow_id))]).boxed(),
678            ));
679        let mut events = ResumingEventStream::new(
680            stub.clone(),
681            "tenant-a",
682            SubscribeTarget::Workflow { workflow_id },
683        );
684
685        let mut seqs = Vec::new();
686        while let Some(item) = events.next().await {
687            // A transient attach failure must not surface; `?` fails the test
688            // with the offending error if it does.
689            seqs.push(item?.seq());
690        }
691
692        assert_eq!(seqs, vec![1, 2]);
693        assert_eq!(
694            *stub.resume_points.lock().await,
695            vec![None, None, None],
696            "every retried initial attach is still a live tail (no cursor)"
697        );
698        Ok(())
699    }
700
701    /// A mid-stream drop followed by an `Unavailable` reconnect failure keeps
702    /// retrying with the SAME cursor until the reconnect succeeds.
703    #[tokio::test]
704    async fn unavailable_reconnect_failure_retries_with_the_same_cursor() -> Result<(), ClientError>
705    {
706        let workflow_id = WorkflowId::new_v4();
707        let stub = Arc::new(SubscribeStub::default());
708        stub.attempts
709            .lock()
710            .await
711            .push_back(SubscriptionAttempt::new(
712                stream::iter(vec![
713                    Ok(event(1, &workflow_id)),
714                    Err(ClientError::unavailable("transient disconnect")),
715                ])
716                .boxed(),
717            ));
718        let mut events = ResumingEventStream::new(
719            stub.clone(),
720            "tenant-a",
721            SubscribeTarget::Workflow {
722                workflow_id: workflow_id.clone(),
723            },
724        );
725        let first = events.next().await;
726        assert!(matches!(first, Some(Ok(_))), "got {first:?}");
727        // The reconnect attempt fails transiently, then succeeds.
728        stub.attach_failures
729            .lock()
730            .await
731            .push_back(ClientError::unavailable("connection refused"));
732        stub.attempts
733            .lock()
734            .await
735            .push_back(SubscriptionAttempt::new(
736                stream::iter(vec![Ok(event(2, &workflow_id))]).boxed(),
737            ));
738
739        let mut seqs = vec![1];
740        while let Some(item) = events.next().await {
741            // A transient reconnect failure must not surface; `?` fails the
742            // test with the offending error if it does.
743            seqs.push(item?.seq());
744        }
745
746        assert_eq!(seqs, vec![1, 2]);
747        assert_eq!(
748            *stub.resume_points.lock().await,
749            vec![None, Some(2), Some(2)],
750            "the failed reconnect and the successful retry carry the same cursor"
751        );
752        Ok(())
753    }
754
755    /// Non-`Unavailable` attach failures are terminal immediately: an
756    /// `Unauthenticated` connect rejection must never be retried.
757    #[tokio::test]
758    async fn non_retryable_attach_failure_is_terminal() {
759        let workflow_id = WorkflowId::new_v4();
760        let stub = Arc::new(SubscribeStub::default());
761        stub.attach_failures
762            .lock()
763            .await
764            .push_back(ClientError::unauthenticated("bad token"));
765        let mut events = ResumingEventStream::new(
766            stub.clone(),
767            "tenant-a",
768            SubscribeTarget::Workflow { workflow_id },
769        );
770
771        assert_eq!(
772            events.next().await,
773            Some(Err(ClientError::unauthenticated("bad token")))
774        );
775        assert_eq!(events.next().await, None);
776        assert_eq!(stub.resume_points.lock().await.len(), 1);
777    }
778
779    /// Live-only streams also retry `Unavailable` attach failures while
780    /// nothing has been delivered (a live-only reattach cannot gap yet).
781    #[tokio::test]
782    async fn live_only_unavailable_attach_failure_is_retried_before_any_delivery() {
783        let workflow_id = WorkflowId::new_v4();
784        let stub = Arc::new(SubscribeStub::default());
785        stub.attach_failures
786            .lock()
787            .await
788            .push_back(ClientError::unavailable("connection refused"));
789        stub.attempts
790            .lock()
791            .await
792            .push_back(SubscriptionAttempt::new(
793                stream::iter(vec![Ok(event(1, &workflow_id))]).boxed(),
794            ));
795        let mut events =
796            ResumingEventStream::new(stub.clone(), "tenant-a", SubscribeTarget::Firehose);
797
798        let mut seqs = Vec::new();
799        while let Some(item) = events.next().await {
800            if let Ok(event) = item {
801                seqs.push(event.seq());
802            }
803        }
804
805        assert_eq!(seqs, vec![1]);
806        assert_eq!(*stub.resume_points.lock().await, vec![None, None]);
807    }
808}