1use std::num::NonZeroU64;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use aion_core::{Event, WorkflowFilter, WorkflowId};
9use aion_proto::{
10 FilteredSubscription, FirehoseSubscription, PerWorkflowSubscription, ProtoWorkflowId,
11 SubscriptionRequest, subscription_request,
12};
13use futures::Stream;
14use futures::future::BoxFuture;
15use futures::stream::BoxStream;
16
17use crate::error::ClientError;
18use crate::transport::{SubscriptionAttempt, WorkflowTransport};
19
20pub type EventStream = Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>;
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum SubscribeTarget {
26 Workflow {
28 workflow_id: WorkflowId,
30 },
31 Filtered {
33 filter: WorkflowFilter,
35 },
36 Firehose,
38}
39
40impl SubscribeTarget {
41 pub(crate) fn request(&self, namespace: &str) -> SubscriptionRequest {
42 match self {
43 Self::Workflow { workflow_id } => SubscriptionRequest {
44 subscription: Some(subscription_request::Subscription::PerWorkflow(
45 PerWorkflowSubscription {
46 namespace: namespace.to_owned(),
47 workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
48 resume_from_seq: None,
49 },
50 )),
51 },
52 Self::Filtered { filter } => SubscriptionRequest {
53 subscription: Some(subscription_request::Subscription::Filtered(
54 FilteredSubscription {
55 namespace: namespace.to_owned(),
56 workflow_type: filter.workflow_type.clone(),
57 status: filter
58 .status
59 .map(|status| aion_proto::ProtoWorkflowStatus::from(status) as i32),
60 namespace_selector: None,
61 },
62 )),
63 },
64 Self::Firehose => SubscriptionRequest {
65 subscription: Some(subscription_request::Subscription::Firehose(
66 FirehoseSubscription {
67 namespace: namespace.to_owned(),
68 },
69 )),
70 },
71 }
72 }
73}
74
75pub struct ResumingEventStream {
94 transport: Arc<dyn WorkflowTransport>,
95 namespace: String,
96 target: SubscribeTarget,
97 last_seq: Option<u64>,
98 delivered_any: bool,
99 current: Option<BoxStream<'static, Result<Event, ClientError>>>,
100 pending_subscribe: Option<BoxFuture<'static, Result<SubscriptionAttempt, ClientError>>>,
101 terminal_error: Option<ClientError>,
102 finished: bool,
103}
104
105impl ResumingEventStream {
106 #[must_use]
108 pub fn new(
109 transport: Arc<dyn WorkflowTransport>,
110 namespace: impl Into<String>,
111 target: SubscribeTarget,
112 ) -> Self {
113 Self {
114 transport,
115 namespace: namespace.into(),
116 target,
117 last_seq: None,
118 delivered_any: false,
119 current: None,
120 pending_subscribe: None,
121 terminal_error: None,
122 finished: false,
123 }
124 }
125
126 #[must_use]
134 pub fn from_sequence(
135 transport: Arc<dyn WorkflowTransport>,
136 namespace: impl Into<String>,
137 workflow_id: WorkflowId,
138 resume_from: NonZeroU64,
139 ) -> Self {
140 let mut stream = Self::new(
141 transport,
142 namespace,
143 SubscribeTarget::Workflow { workflow_id },
144 );
145 stream.last_seq = Some(resume_from.get() - 1);
149 stream
150 }
151
152 fn is_per_workflow(&self) -> bool {
153 matches!(self.target, SubscribeTarget::Workflow { .. })
154 }
155
156 fn start_subscribe(&mut self) {
157 let transport = Arc::clone(&self.transport);
158 let request = self.target.request(&self.namespace);
159 let resume_from_sequence = if self.is_per_workflow() {
162 self.last_seq.map(|seq| seq.saturating_add(1))
163 } else {
164 None
165 };
166 self.pending_subscribe = Some(Box::pin(async move {
167 transport.subscribe(request, resume_from_sequence).await
168 }));
169 }
170}
171
172impl Stream for ResumingEventStream {
173 type Item = Result<Event, ClientError>;
174
175 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176 let this = self.get_mut();
177 loop {
178 if this.finished {
179 return Poll::Ready(None);
180 }
181
182 if let Some(error) = this.terminal_error.take() {
183 this.finished = true;
184 return Poll::Ready(Some(Err(error)));
185 }
186
187 if this.current.is_none() && this.pending_subscribe.is_none() {
188 this.start_subscribe();
189 }
190
191 if let Some(pending) = this.pending_subscribe.as_mut() {
192 match pending.as_mut().poll(cx) {
193 Poll::Pending => return Poll::Pending,
194 Poll::Ready(Ok(attempt)) => {
195 this.pending_subscribe = None;
196 this.current = Some(attempt.events);
197 }
198 Poll::Ready(Err(error)) => {
199 this.pending_subscribe = None;
206 if is_retryable(&error) && (this.is_per_workflow() || !this.delivered_any) {
207 continue;
208 }
209 this.finished = true;
210 return Poll::Ready(Some(Err(error)));
211 }
212 }
213 }
214
215 let Some(current) = this.current.as_mut() else {
216 continue;
217 };
218 match current.as_mut().poll_next(cx) {
219 Poll::Pending => return Poll::Pending,
220 Poll::Ready(Some(Ok(event))) => {
221 if this.is_per_workflow() {
222 if this.last_seq.is_some_and(|seq| event.seq() <= seq) {
225 continue;
226 }
227 this.last_seq = Some(event.seq());
228 }
229 this.delivered_any = true;
230 return Poll::Ready(Some(Ok(event)));
231 }
232 Poll::Ready(Some(Err(error))) => {
233 this.current = None;
234 if is_retryable(&error) {
235 if this.is_per_workflow() {
236 continue;
237 }
238 if !this.delivered_any {
239 continue;
242 }
243 }
247 this.terminal_error = Some(error);
248 }
249 Poll::Ready(None) => {
250 this.current = None;
251 this.finished = true;
252 return Poll::Ready(None);
253 }
254 }
255 }
256 }
257}
258
259#[must_use]
261pub fn event_stream(
262 transport: Arc<dyn WorkflowTransport>,
263 namespace: impl Into<String>,
264 target: SubscribeTarget,
265) -> EventStream {
266 Box::pin(ResumingEventStream::new(transport, namespace, target))
267}
268
269#[must_use]
271pub fn event_stream_from(
272 transport: Arc<dyn WorkflowTransport>,
273 namespace: impl Into<String>,
274 workflow_id: WorkflowId,
275 resume_from: NonZeroU64,
276) -> EventStream {
277 Box::pin(ResumingEventStream::from_sequence(
278 transport,
279 namespace,
280 workflow_id,
281 resume_from,
282 ))
283}
284
285fn is_retryable(error: &ClientError) -> bool {
286 matches!(error, ClientError::Unavailable { .. })
287}
288
289#[cfg(test)]
290mod tests {
291 use std::collections::VecDeque;
292 use std::sync::Arc;
293
294 use aion_core::{ContentType, Event, EventEnvelope, Payload, WorkflowId};
295 use aion_proto::{
296 ProtoCancelResponse, ProtoDescribeWorkflowResponse, ProtoListWorkflowsResponse,
297 ProtoQueryResponse, ProtoSignalResponse, ProtoStartWorkflowResponse,
298 };
299 use async_trait::async_trait;
300 use chrono::Utc;
301 use futures::StreamExt;
302 use futures::stream;
303 use tokio::sync::Mutex;
304
305 use super::{ResumingEventStream, SubscribeTarget};
306 use crate::error::ClientError;
307 use crate::transport::{SubscriptionAttempt, WorkflowTransport};
308
309 #[derive(Default)]
310 struct SubscribeStub {
311 attach_failures: Mutex<VecDeque<ClientError>>,
314 attempts: Mutex<VecDeque<SubscriptionAttempt>>,
315 resume_points: Mutex<Vec<Option<u64>>>,
316 }
317
318 #[async_trait]
319 impl WorkflowTransport for SubscribeStub {
320 async fn start_workflow(
321 &self,
322 _: aion_proto::ProtoStartWorkflowRequest,
323 ) -> Result<ProtoStartWorkflowResponse, ClientError> {
324 Err(ClientError::unavailable("stub transport"))
325 }
326
327 async fn signal(
328 &self,
329 _: aion_proto::ProtoSignalRequest,
330 ) -> Result<ProtoSignalResponse, ClientError> {
331 Err(ClientError::unavailable("stub transport"))
332 }
333
334 async fn query(
335 &self,
336 _: aion_proto::ProtoQueryRequest,
337 ) -> Result<ProtoQueryResponse, ClientError> {
338 Err(ClientError::unavailable("stub transport"))
339 }
340
341 async fn cancel(
342 &self,
343 _: aion_proto::ProtoCancelRequest,
344 ) -> Result<ProtoCancelResponse, ClientError> {
345 Err(ClientError::unavailable("stub transport"))
346 }
347
348 async fn list_workflows(
349 &self,
350 _: aion_proto::ProtoListWorkflowsRequest,
351 ) -> Result<ProtoListWorkflowsResponse, ClientError> {
352 Err(ClientError::unavailable("stub transport"))
353 }
354
355 async fn describe_workflow(
356 &self,
357 _: aion_proto::ProtoDescribeWorkflowRequest,
358 ) -> Result<ProtoDescribeWorkflowResponse, ClientError> {
359 Err(ClientError::unavailable("stub transport"))
360 }
361
362 async fn subscribe(
363 &self,
364 _: aion_proto::SubscriptionRequest,
365 resume_from_sequence: Option<u64>,
366 ) -> Result<SubscriptionAttempt, ClientError> {
367 self.resume_points.lock().await.push(resume_from_sequence);
368 if let Some(failure) = self.attach_failures.lock().await.pop_front() {
369 return Err(failure);
370 }
371 self.attempts
372 .lock()
373 .await
374 .pop_front()
375 .ok_or_else(|| ClientError::server("missing subscribe attempt"))
376 }
377 }
378
379 fn event(seq: u64, workflow_id: &WorkflowId) -> Event {
380 Event::WorkflowStarted {
381 envelope: EventEnvelope {
382 seq,
383 recorded_at: Utc::now(),
384 workflow_id: workflow_id.clone(),
385 },
386 workflow_type: String::from("checkout"),
387 input: Payload::new(ContentType::Json, Vec::new()),
388 run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
389 parent_run_id: None,
390 package_version: aion_core::PackageVersion::new("a".repeat(64)),
391 }
392 }
393
394 #[tokio::test]
395 async fn resumes_after_transient_disconnect_without_gaps_or_duplicates() {
396 let workflow_id = WorkflowId::new_v4();
397 let stub = Arc::new(SubscribeStub::default());
398 stub.attempts
399 .lock()
400 .await
401 .push_back(SubscriptionAttempt::new(
402 stream::iter(vec![
403 Ok(event(1, &workflow_id)),
404 Ok(event(2, &workflow_id)),
405 Err(ClientError::unavailable("transient disconnect")),
406 ])
407 .boxed(),
408 ));
409 stub.attempts
410 .lock()
411 .await
412 .push_back(SubscriptionAttempt::new(
413 stream::iter(vec![
414 Ok(event(2, &workflow_id)),
415 Ok(event(3, &workflow_id)),
416 Ok(event(4, &workflow_id)),
417 ])
418 .boxed(),
419 ));
420 let mut events = ResumingEventStream::new(
421 stub.clone(),
422 "tenant-a",
423 SubscribeTarget::Workflow {
424 workflow_id: workflow_id.clone(),
425 },
426 );
427
428 let mut seqs = Vec::new();
429 while let Some(item) = events.next().await {
430 let event = item
431 .map_err(|e| format!("unexpected stream error: {e}"))
432 .ok();
433 if let Some(event) = event {
434 seqs.push(event.seq());
435 }
436 }
437
438 assert_eq!(seqs, vec![1, 2, 3, 4]);
439 assert_eq!(*stub.resume_points.lock().await, vec![None, Some(3)]);
440 }
441
442 #[tokio::test]
443 async fn terminal_failure_is_yielded_before_end() {
444 let workflow_id = WorkflowId::new_v4();
445 let stub = Arc::new(SubscribeStub::default());
446 stub.attempts
447 .lock()
448 .await
449 .push_back(SubscriptionAttempt::new(
450 stream::iter(vec![Err(ClientError::unauthenticated("bad token"))]).boxed(),
451 ));
452 let mut events =
453 ResumingEventStream::new(stub, "tenant-a", SubscribeTarget::Workflow { workflow_id });
454
455 assert_eq!(
456 events.next().await,
457 Some(Err(ClientError::unauthenticated("bad token")))
458 );
459 assert_eq!(events.next().await, None);
460 }
461
462 #[tokio::test]
463 async fn namespace_denied_is_terminal_and_never_retried() {
464 let workflow_id = WorkflowId::new_v4();
465 let stub = Arc::new(SubscribeStub::default());
466 let denied =
467 ClientError::namespace_denied("namespace tenant-b is not granted to this caller");
468 stub.attempts
469 .lock()
470 .await
471 .push_back(SubscriptionAttempt::new(
472 stream::iter(vec![Err(denied.clone())]).boxed(),
473 ));
474 let mut events = ResumingEventStream::new(
475 stub.clone(),
476 "tenant-b",
477 SubscribeTarget::Workflow { workflow_id },
478 );
479
480 assert_eq!(events.next().await, Some(Err(denied)));
481 assert_eq!(events.next().await, None);
482 assert_eq!(stub.resume_points.lock().await.len(), 1);
483 }
484
485 #[tokio::test]
486 async fn from_sequence_passes_the_cursor_on_the_initial_attach() {
487 let workflow_id = WorkflowId::new_v4();
488 let stub = Arc::new(SubscribeStub::default());
489 stub.attempts
490 .lock()
491 .await
492 .push_back(SubscriptionAttempt::new(
493 stream::iter(vec![Ok(event(1, &workflow_id)), Ok(event(2, &workflow_id))]).boxed(),
494 ));
495 let Some(resume_from) = std::num::NonZeroU64::new(1) else {
496 unreachable!("1 is non-zero");
497 };
498 let mut events = super::ResumingEventStream::from_sequence(
499 stub.clone(),
500 "tenant-a",
501 workflow_id,
502 resume_from,
503 );
504
505 let mut seqs = Vec::new();
506 while let Some(item) = events.next().await {
507 if let Ok(event) = item {
508 seqs.push(event.seq());
509 }
510 }
511
512 assert_eq!(seqs, vec![1, 2]);
513 assert_eq!(
514 *stub.resume_points.lock().await,
515 vec![Some(1)],
516 "the initial attach must carry the explicit cursor"
517 );
518 }
519
520 #[tokio::test]
521 async fn live_only_streams_reconnect_only_before_any_delivery() {
522 let workflow_id = WorkflowId::new_v4();
525 let stub = Arc::new(SubscribeStub::default());
526 stub.attempts
527 .lock()
528 .await
529 .push_back(SubscriptionAttempt::new(
530 stream::iter(vec![Err(ClientError::unavailable("transient disconnect"))]).boxed(),
531 ));
532 stub.attempts
533 .lock()
534 .await
535 .push_back(SubscriptionAttempt::new(
536 stream::iter(vec![Ok(event(1, &workflow_id))]).boxed(),
537 ));
538 let mut events = ResumingEventStream::new(
539 stub.clone(),
540 "tenant-a",
541 SubscribeTarget::Filtered {
542 filter: aion_core::WorkflowFilter::default(),
543 },
544 );
545
546 let mut seqs = Vec::new();
547 while let Some(item) = events.next().await {
548 if let Ok(event) = item {
549 seqs.push(event.seq());
550 }
551 }
552
553 assert_eq!(seqs, vec![1]);
554 assert_eq!(
555 *stub.resume_points.lock().await,
556 vec![None, None],
557 "live-only streams never carry a resume cursor"
558 );
559 }
560
561 #[tokio::test]
562 async fn live_only_disconnect_after_delivery_is_honest_unavailable() {
563 for target in [
567 SubscribeTarget::Filtered {
568 filter: aion_core::WorkflowFilter::default(),
569 },
570 SubscribeTarget::Firehose,
571 ] {
572 let workflow_id = WorkflowId::new_v4();
573 let stub = Arc::new(SubscribeStub::default());
574 stub.attempts
575 .lock()
576 .await
577 .push_back(SubscriptionAttempt::new(
578 stream::iter(vec![
579 Ok(event(1, &workflow_id)),
580 Err(ClientError::unavailable("transient disconnect")),
581 ])
582 .boxed(),
583 ));
584 let mut events = ResumingEventStream::new(stub.clone(), "tenant-a", target);
585
586 let first = events.next().await;
587 assert!(matches!(first, Some(Ok(_))), "got {first:?}");
588 assert_eq!(
589 events.next().await,
590 Some(Err(ClientError::unavailable("transient disconnect")))
591 );
592 assert_eq!(events.next().await, None);
593 assert_eq!(
594 stub.resume_points.lock().await.len(),
595 1,
596 "no reattach may follow a post-delivery live-only disconnect"
597 );
598 }
599 }
600
601 #[tokio::test]
602 async fn live_only_streams_do_not_dedupe_sequence_numbers_across_workflows() {
603 let first_workflow = WorkflowId::new_v4();
606 let second_workflow = WorkflowId::new_v4();
607 let stub = Arc::new(SubscribeStub::default());
608 stub.attempts
609 .lock()
610 .await
611 .push_back(SubscriptionAttempt::new(
612 stream::iter(vec![
613 Ok(event(1, &first_workflow)),
614 Ok(event(1, &second_workflow)),
615 ])
616 .boxed(),
617 ));
618 let mut events = ResumingEventStream::new(stub, "tenant-a", SubscribeTarget::Firehose);
619
620 let mut delivered = Vec::new();
621 while let Some(item) = events.next().await {
622 if let Ok(event) = item {
623 delivered.push(event.envelope().workflow_id.clone());
624 }
625 }
626
627 assert_eq!(delivered, vec![first_workflow, second_workflow]);
628 }
629
630 #[tokio::test]
631 async fn not_found_is_terminal_and_never_retried() {
632 let workflow_id = WorkflowId::new_v4();
636 let stub = Arc::new(SubscribeStub::default());
637 stub.attempts
638 .lock()
639 .await
640 .push_back(SubscriptionAttempt::new(
641 stream::iter(vec![Err(ClientError::not_found("workflow was not found"))]).boxed(),
642 ));
643 let mut events = ResumingEventStream::new(
644 stub.clone(),
645 "tenant-a",
646 SubscribeTarget::Workflow { workflow_id },
647 );
648
649 assert_eq!(
650 events.next().await,
651 Some(Err(ClientError::not_found("workflow was not found")))
652 );
653 assert_eq!(events.next().await, None);
654 assert_eq!(stub.resume_points.lock().await.len(), 1);
655 }
656
657 #[tokio::test]
661 async fn unavailable_attach_failure_is_retried_until_attach_succeeds() -> Result<(), ClientError>
662 {
663 let workflow_id = WorkflowId::new_v4();
664 let stub = Arc::new(SubscribeStub::default());
665 stub.attach_failures
666 .lock()
667 .await
668 .push_back(ClientError::unavailable("connection refused"));
669 stub.attach_failures
670 .lock()
671 .await
672 .push_back(ClientError::unavailable("connection refused"));
673 stub.attempts
674 .lock()
675 .await
676 .push_back(SubscriptionAttempt::new(
677 stream::iter(vec![Ok(event(1, &workflow_id)), Ok(event(2, &workflow_id))]).boxed(),
678 ));
679 let mut events = ResumingEventStream::new(
680 stub.clone(),
681 "tenant-a",
682 SubscribeTarget::Workflow { workflow_id },
683 );
684
685 let mut seqs = Vec::new();
686 while let Some(item) = events.next().await {
687 seqs.push(item?.seq());
690 }
691
692 assert_eq!(seqs, vec![1, 2]);
693 assert_eq!(
694 *stub.resume_points.lock().await,
695 vec![None, None, None],
696 "every retried initial attach is still a live tail (no cursor)"
697 );
698 Ok(())
699 }
700
701 #[tokio::test]
704 async fn unavailable_reconnect_failure_retries_with_the_same_cursor() -> Result<(), ClientError>
705 {
706 let workflow_id = WorkflowId::new_v4();
707 let stub = Arc::new(SubscribeStub::default());
708 stub.attempts
709 .lock()
710 .await
711 .push_back(SubscriptionAttempt::new(
712 stream::iter(vec![
713 Ok(event(1, &workflow_id)),
714 Err(ClientError::unavailable("transient disconnect")),
715 ])
716 .boxed(),
717 ));
718 let mut events = ResumingEventStream::new(
719 stub.clone(),
720 "tenant-a",
721 SubscribeTarget::Workflow {
722 workflow_id: workflow_id.clone(),
723 },
724 );
725 let first = events.next().await;
726 assert!(matches!(first, Some(Ok(_))), "got {first:?}");
727 stub.attach_failures
729 .lock()
730 .await
731 .push_back(ClientError::unavailable("connection refused"));
732 stub.attempts
733 .lock()
734 .await
735 .push_back(SubscriptionAttempt::new(
736 stream::iter(vec![Ok(event(2, &workflow_id))]).boxed(),
737 ));
738
739 let mut seqs = vec![1];
740 while let Some(item) = events.next().await {
741 seqs.push(item?.seq());
744 }
745
746 assert_eq!(seqs, vec![1, 2]);
747 assert_eq!(
748 *stub.resume_points.lock().await,
749 vec![None, Some(2), Some(2)],
750 "the failed reconnect and the successful retry carry the same cursor"
751 );
752 Ok(())
753 }
754
755 #[tokio::test]
758 async fn non_retryable_attach_failure_is_terminal() {
759 let workflow_id = WorkflowId::new_v4();
760 let stub = Arc::new(SubscribeStub::default());
761 stub.attach_failures
762 .lock()
763 .await
764 .push_back(ClientError::unauthenticated("bad token"));
765 let mut events = ResumingEventStream::new(
766 stub.clone(),
767 "tenant-a",
768 SubscribeTarget::Workflow { workflow_id },
769 );
770
771 assert_eq!(
772 events.next().await,
773 Some(Err(ClientError::unauthenticated("bad token")))
774 );
775 assert_eq!(events.next().await, None);
776 assert_eq!(stub.resume_points.lock().await.len(), 1);
777 }
778
779 #[tokio::test]
782 async fn live_only_unavailable_attach_failure_is_retried_before_any_delivery() {
783 let workflow_id = WorkflowId::new_v4();
784 let stub = Arc::new(SubscribeStub::default());
785 stub.attach_failures
786 .lock()
787 .await
788 .push_back(ClientError::unavailable("connection refused"));
789 stub.attempts
790 .lock()
791 .await
792 .push_back(SubscriptionAttempt::new(
793 stream::iter(vec![Ok(event(1, &workflow_id))]).boxed(),
794 ));
795 let mut events =
796 ResumingEventStream::new(stub.clone(), "tenant-a", SubscribeTarget::Firehose);
797
798 let mut seqs = Vec::new();
799 while let Some(item) = events.next().await {
800 if let Ok(event) = item {
801 seqs.push(event.seq());
802 }
803 }
804
805 assert_eq!(seqs, vec![1]);
806 assert_eq!(*stub.resume_points.lock().await, vec![None, None]);
807 }
808}