1use crate::{
7 error::{ErrorKind, ErrorResponse},
8 http::{
9 headers::{HeaderName, Headers},
10 policies::create_public_api_span,
11 Context, Format, JsonFormat, Response, StatusCode, Url,
12 },
13 sleep,
14 time::{Duration, OffsetDateTime},
15 tracing::{Span, SpanStatus},
16};
17use futures::{channel::oneshot, stream::unfold, Stream, StreamExt};
18use serde::Deserialize;
19use std::{
20 convert::Infallible,
21 fmt,
22 future::{Future, IntoFuture},
23 pin::Pin,
24 str::FromStr,
25 sync::Arc,
26 task::{Context as TaskContext, Poll},
27};
28
29const DEFAULT_RETRY_TIME: Duration = Duration::seconds(30);
34
35const MIN_RETRY_TIME: Duration = Duration::seconds(1);
37
38#[derive(Debug, Clone, Default, PartialEq, Eq)]
40pub enum PollerState {
41 #[default]
43 Initial,
44 More(PollerContinuation),
46}
47
48#[derive(Debug, Default, Clone, PartialEq, Eq)]
50pub enum PollerStatus {
51 #[default]
53 InProgress,
54
55 Succeeded,
57
58 Failed,
60
61 Canceled,
63
64 UnknownValue(String),
66}
67
68impl From<&str> for PollerStatus {
69 fn from(value: &str) -> Self {
70 if "inprogress".eq_ignore_ascii_case(value) {
75 return PollerStatus::InProgress;
76 }
77
78 if "succeeded".eq_ignore_ascii_case(value) {
79 return PollerStatus::Succeeded;
80 }
81
82 if "failed".eq_ignore_ascii_case(value) {
83 return PollerStatus::Failed;
84 }
85
86 if "canceled".eq_ignore_ascii_case(value) || "cancelled".eq_ignore_ascii_case(value) {
89 return PollerStatus::Canceled;
90 }
91
92 PollerStatus::UnknownValue(value.to_owned())
93 }
94}
95
96impl FromStr for PollerStatus {
97 type Err = Infallible;
98 fn from_str(value: &str) -> Result<Self, Self::Err> {
99 Ok(value.into())
100 }
101}
102
103impl<'de> Deserialize<'de> for PollerStatus {
104 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
105 where
106 D: serde::Deserializer<'de>,
107 {
108 struct PollerStatusVisitor;
109 impl serde::de::Visitor<'_> for PollerStatusVisitor {
110 type Value = PollerStatus;
111
112 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
113 formatter.write_str("a string representing a PollerStatus")
114 }
115
116 fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
117 where
118 E: serde::de::Error,
119 {
120 FromStr::from_str(s).map_err(serde::de::Error::custom)
121 }
122 }
123
124 deserializer.deserialize_str(PollerStatusVisitor)
125 }
126}
127
128#[derive(Debug, Clone)]
130pub struct PollerOptions<'a> {
131 pub context: Context<'a>,
133 pub frequency: Duration,
137}
138
139impl Default for PollerOptions<'_> {
140 fn default() -> Self {
141 Self {
142 frequency: DEFAULT_RETRY_TIME,
143 context: Context::new(),
144 }
145 }
146}
147
148impl<'a> PollerOptions<'a> {
149 #[must_use]
151 pub fn into_owned(self) -> PollerOptions<'static> {
152 PollerOptions {
153 context: self.context.into_owned(),
154 frequency: self.frequency,
155 }
156 }
157}
158
159pub enum PollerResult<M, F = JsonFormat>
161where
162 M: StatusMonitor,
163 F: Format,
164{
165 InProgress {
173 response: Response<M, F>,
175 retry_after: Duration,
177 continuation: PollerContinuation,
179 },
180
181 Done {
187 response: Response<M, F>,
189 },
190
191 Succeeded {
198 response: Response<M, F>,
200 target: BoxedCallback<M>,
202 },
203}
204
205impl<M, F> fmt::Debug for PollerResult<M, F>
206where
207 M: StatusMonitor,
208 F: Format,
209{
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 match self {
212 Self::InProgress {
213 retry_after,
214 continuation,
215 ..
216 } => f
217 .debug_struct("InProgress")
218 .field("retry_after", &retry_after)
219 .field("continuation", &continuation)
220 .finish_non_exhaustive(),
221 Self::Done { .. } => f.debug_struct("Done").finish_non_exhaustive(),
222 Self::Succeeded { .. } => f.debug_struct("Succeeded").finish_non_exhaustive(),
223 }
224 }
225}
226
227#[derive(Clone, Debug, PartialEq, Eq)]
229#[non_exhaustive]
230pub enum PollerContinuation {
231 Links {
233 next_link: Url,
235
236 final_link: Option<Url>,
240 },
241}
242
243impl fmt::Display for PollerContinuation {
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 match self {
246 PollerContinuation::Links { next_link, .. } => f.write_str(next_link.as_str()),
247 }
248 }
249}
250
251pub trait StatusMonitor {
253 type Output;
257
258 type Format: Format + Send;
262
263 fn status(&self) -> PollerStatus;
265}
266
267mod types {
268 use super::{PollerResult, Response, StatusMonitor, Stream};
269 use std::{future::Future, pin::Pin};
270
271 pub type BoxedStream<M, F> = Box<dyn Stream<Item = crate::Result<Response<M, F>>> + Send>;
272 pub type BoxedFuture<M> = Box<
273 dyn Future<
274 Output = crate::Result<
275 Response<<M as StatusMonitor>::Output, <M as StatusMonitor>::Format>,
276 >,
277 > + Send,
278 >;
279 pub type BoxedCallback<M> = Box<dyn FnOnce() -> Pin<BoxedFuture<M>> + Send>;
280
281 pub type PollerResultFuture<M, F> =
283 Pin<Box<dyn Future<Output = crate::Result<PollerResult<M, F>>> + Send + 'static>>;
284}
285
286pub use types::PollerResultFuture;
287use types::{BoxedCallback, BoxedFuture, BoxedStream};
288
289#[pin_project::pin_project]
350pub struct Poller<M, F = JsonFormat>
351where
352 M: StatusMonitor,
353 F: Format,
354{
355 #[pin]
356 stream: Pin<BoxedStream<M, F>>,
357 target: Option<BoxedFuture<M>>,
358}
359
360impl<M, F> Poller<M, F>
361where
362 M: StatusMonitor,
363 F: Format + Send,
364{
365 pub fn new<Fun>(make_request: Fun, options: Option<PollerOptions<'static>>) -> Self
466 where
467 M: Send + 'static,
468 M::Output: Send + 'static,
469 M::Format: Send + 'static,
470 Fun: Fn(PollerState, PollerOptions<'static>) -> PollerResultFuture<M, F> + Send + 'static,
471 {
472 let options = options.unwrap_or_default();
473 let (stream, target) = create_poller_stream(make_request, options);
474 Self {
475 stream: Box::pin(stream),
476 target: Some(target),
477 }
478 }
479}
480
481impl<M, F> Stream for Poller<M, F>
482where
483 M: StatusMonitor,
484 F: Format,
485{
486 type Item = crate::Result<Response<M, F>>;
487
488 fn poll_next(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
489 let state = self.project().stream.poll_next(cx);
490 if let Poll::Ready(Some(Ok(ref response))) = state {
491 check_status_code(response)?;
492 }
493
494 state
495 }
496}
497
498impl<M, F> IntoFuture for Poller<M, F>
499where
500 M: StatusMonitor + 'static,
501 M::Output: Send + 'static,
502 M::Format: Send + 'static,
503 F: Format + 'static,
504{
505 type Output = crate::Result<Response<M::Output, M::Format>>;
506
507 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
508
509 fn into_future(mut self) -> Self::IntoFuture {
510 Box::pin(async move {
511 while let Some(result) = self.stream.next().await {
513 result?;
515 }
516
517 let target = self.target.ok_or_else(|| {
519 crate::Error::new(
520 ErrorKind::Other,
521 "poller completed without a target response",
522 )
523 })?;
524
525 Box::into_pin(target).await
527 })
528 }
529}
530
531impl<M, F> fmt::Debug for Poller<M, F>
532where
533 M: StatusMonitor,
534 F: Format,
535{
536 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537 f.write_str("Poller")
538 }
539}
540#[derive(Debug, Clone, PartialEq, Eq)]
541enum State {
542 Init,
543 InProgress(PollerContinuation),
544 Done,
545}
546
547type TargetTransmitterType<'a, M> = (Pin<BoxedFuture<M>>, Option<Context<'a>>);
549
550struct StreamState<'a, M, Fun>
552where
553 M: StatusMonitor,
554{
555 state: State,
557 make_request: Fun,
559 target_tx: Option<oneshot::Sender<TargetTransmitterType<'a, M>>>,
561 options: PollerOptions<'a>,
563 added_span: bool,
565}
566
567fn create_poller_stream<
568 M,
569 F: Format,
570 Fun: Fn(PollerState, PollerOptions<'static>) -> PollerResultFuture<M, F> + Send + 'static,
571>(
572 make_request: Fun,
573 options: PollerOptions<'static>,
574) -> (
575 impl Stream<Item = crate::Result<Response<M, F>>> + 'static,
576 BoxedFuture<M>,
577)
578where
579 M: StatusMonitor + 'static,
580 M::Output: Send + 'static,
581 M::Format: Send + 'static,
582{
583 let (target_tx, target_rx) = oneshot::channel();
584
585 assert!(
586 options.frequency >= MIN_RETRY_TIME,
587 "minimum polling frequency is 1 second"
588 );
589 let stream = unfold(
590 StreamState::<M, Fun> {
592 state: State::Init,
593 make_request,
594 target_tx: Some(target_tx),
595 options,
596 added_span: false,
597 },
598 move |mut poller_stream_state| async move {
599 let result = match poller_stream_state.state {
600 State::Init => {
601 let span =
603 create_public_api_span(&poller_stream_state.options.context, None, None);
604 if let Some(ref s) = span {
605 poller_stream_state.added_span = true;
606 poller_stream_state.options.context =
607 poller_stream_state.options.context.with_value(s.clone());
608 }
609 (poller_stream_state.make_request)(
610 PollerState::Initial,
611 poller_stream_state.options.clone(),
612 )
613 .await
614 }
615 State::InProgress(continuation) => {
616 tracing::debug!(
617 "subsequent operation request to {:?}",
618 continuation.to_string()
619 );
620 (poller_stream_state.make_request)(
621 PollerState::More(continuation),
622 poller_stream_state.options.clone(),
623 )
624 .await
625 }
626 State::Done => {
627 tracing::debug!("done");
628 return None;
629 }
630 };
631 let (item, next_state) = match result {
632 Err(e) => {
633 if poller_stream_state.added_span {
634 if let Some(span) =
635 poller_stream_state.options.context.value::<Arc<dyn Span>>()
636 {
637 span.set_status(SpanStatus::Error {
639 description: e.to_string(),
640 });
641 span.set_attribute("error.type", e.kind().to_string().into());
642 span.end();
643 }
644 }
645
646 poller_stream_state.state = State::Done;
647 return Some((Err(e), poller_stream_state));
648 }
649 Ok(PollerResult::InProgress {
650 response,
651 retry_after,
652 continuation: n,
653 }) => {
654 tracing::trace!("retry poller in {}s", retry_after.whole_seconds());
657 sleep(retry_after).await;
658
659 (Ok(response), State::InProgress(n))
660 }
661 Ok(PollerResult::Done { response }) => (Ok(response), State::Done),
665 Ok(PollerResult::Succeeded {
666 response,
667 target: get_target,
668 }) => {
669 if let Some(tx) = poller_stream_state.target_tx.take() {
671 let _ = tx.send((
672 get_target(),
673 if poller_stream_state.added_span {
674 Some(poller_stream_state.options.context.clone())
675 } else {
676 None
677 },
678 ));
679 }
680 poller_stream_state.state = State::Done;
682 return Some((Ok(response), poller_stream_state));
683 }
684 };
685
686 poller_stream_state.state = next_state;
688 Some((item, poller_stream_state))
689 },
690 );
691
692 let target = Box::new(async move {
693 match target_rx.await {
694 Ok(target_state) => {
695 let res = target_state.0.await;
697 if let Some(ctx) = target_state.1 {
700 match &res {
701 Ok(response) => {
702 if let Some(span) = ctx.value::<Arc<dyn Span>>() {
705 if response.status().is_server_error() {
708 span.set_status(SpanStatus::Error {
709 description: "".to_string(),
710 });
711 }
712 if response.status().is_client_error()
713 || response.status().is_server_error()
714 {
715 span.set_attribute(
716 "error.type",
717 response.status().to_string().into(),
718 );
719 }
720
721 span.end();
722 }
723 }
724 Err(err) => {
725 if let Some(span) = ctx.value::<Arc<dyn Span>>() {
726 span.set_status(SpanStatus::Error {
727 description: err.to_string(),
728 });
729 span.set_attribute("error.type", err.kind().to_string().into());
730 span.end();
731 }
732 }
733 }
734 }
735 res
736 }
737 Err(err) => Err(crate::Error::with_error(
738 ErrorKind::Other,
739 err,
740 "poller completed without defining a target",
741 )),
742 }
743 });
744
745 (stream, target)
746}
747
748pub fn get_retry_after(
750 headers: &Headers,
751 retry_headers: &[HeaderName],
752 options: &PollerOptions,
753) -> Duration {
754 #[cfg_attr(feature = "test", allow(unused_mut))]
755 let duration =
756 crate::http::policies::get_retry_after(headers, OffsetDateTime::now_utc, retry_headers)
757 .unwrap_or(options.frequency);
758
759 #[cfg(feature = "test")]
760 {
761 use crate::test::RecordingMode;
762
763 if matches!(headers.get_optional::<RecordingMode>(), Ok(Some(mode)) if mode == RecordingMode::Playback)
766 {
767 if duration > Duration::ZERO {
768 tracing::debug!(
769 "overriding {}s poller retry in playback",
770 duration.whole_seconds()
771 );
772 }
773
774 return Duration::ZERO;
775 }
776 }
777
778 duration
779}
780
781fn check_status_code<T, F: Format>(response: &Response<T, F>) -> crate::Result<()> {
782 let status = response.status();
783 match status {
784 StatusCode::Ok | StatusCode::Accepted | StatusCode::Created | StatusCode::NoContent => {
785 Ok(())
786 }
787 _ => {
788 let raw_response = Box::new(response.to_raw_response());
790 let error_code = F::deserialize(raw_response.body())
791 .ok()
792 .and_then(|err: ErrorResponse| err.error)
793 .and_then(|details| details.code);
794 Err(ErrorKind::HttpResponse {
795 status,
796 error_code,
797 raw_response: Some(raw_response),
798 }
799 .into_error())
800 }
801 }
802}
803
804#[cfg(test)]
805mod tests {
806 use super::*;
807 #[cfg(feature = "xml")]
808 use crate::http::XmlFormat;
809 use crate::http::{
810 headers::Headers, AsyncRawResponse, HttpClient, Method, NoFormat, RawResponse, Request,
811 };
812 use azure_core_test::http::MockHttpClient;
813 use futures::{FutureExt as _, TryStreamExt as _};
814 use std::sync::{Arc, Mutex};
815
816 #[derive(Debug, serde::Deserialize)]
817 struct TestStatus {
818 status: String,
819 #[serde(default)]
820 target: Option<String>,
821 }
822
823 #[derive(Debug, serde::Deserialize)]
824 struct TestOutput {
825 #[serde(default)]
826 id: Option<String>,
827 #[serde(default)]
828 name: Option<String>,
829 }
830
831 impl StatusMonitor for TestStatus {
832 type Output = TestOutput;
833 type Format = JsonFormat;
834
835 fn status(&self) -> PollerStatus {
836 self.status.parse().unwrap_or_default()
837 }
838 }
839
840 #[cfg(feature = "xml")]
841 #[derive(Debug, serde::Deserialize)]
842 struct XmlTestStatus {
843 status: String,
844 }
845
846 #[cfg(feature = "xml")]
847 impl StatusMonitor for XmlTestStatus {
848 type Output = TestOutput;
849 type Format = XmlFormat;
850
851 fn status(&self) -> PollerStatus {
852 self.status.parse().unwrap_or_default()
853 }
854 }
855
856 #[tokio::test]
857 async fn poller_succeeded() {
858 let call_count = Arc::new(Mutex::new(0));
859
860 let mock_client = {
861 let call_count = call_count.clone();
862 Arc::new(MockHttpClient::new(move |_| {
863 let call_count = call_count.clone();
864 async move {
865 let mut count = call_count.lock().unwrap();
866 *count += 1;
867
868 if *count == 1 {
869 Ok(AsyncRawResponse::from_bytes(
871 StatusCode::Created,
872 Headers::new(),
873 br#"{"status":"InProgress"}"#.to_vec(),
874 ))
875 } else {
876 Ok(AsyncRawResponse::from_bytes(
878 StatusCode::Ok,
879 Headers::new(),
880 br#"{"status":"Succeeded"}"#.to_vec(),
881 ))
882 }
883 }
884 .boxed()
885 }))
886 };
887
888 let mut poller = Poller::new(
889 move |_, _| {
890 let client = mock_client.clone();
891 Box::pin(async move {
892 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
893 let raw_response = client.execute_request(&req).await?;
894 let (status, headers, body) = raw_response.deconstruct();
895 let bytes = body.collect().await?;
896
897 let test_status: TestStatus = crate::json::from_json(&bytes)?;
898 let response: Response<TestStatus> =
899 RawResponse::from_bytes(status, headers, bytes).into();
900
901 match test_status.status() {
902 PollerStatus::InProgress => Ok(PollerResult::InProgress {
903 response,
904 retry_after: Duration::ZERO,
905 continuation: PollerContinuation::Links {
906 next_link: req.url().clone(),
907 final_link: None,
908 },
909 }),
910 _ => Ok(PollerResult::Done { response }),
911 }
912 })
913 },
914 None,
915 );
916
917 let first_result = poller.next().await;
919 assert!(first_result.is_some());
920 let first_response = first_result.unwrap().unwrap();
921 assert_eq!(first_response.status(), StatusCode::Created);
922 let first_body = first_response.into_model().unwrap();
923 assert_eq!(first_body.status(), PollerStatus::InProgress);
924
925 let second_result = poller.next().await;
927 assert!(second_result.is_some());
928 let second_response = second_result.unwrap().unwrap();
929 assert_eq!(second_response.status(), StatusCode::Ok);
930 let second_body = second_response.into_model().unwrap();
931 assert_eq!(second_body.status(), PollerStatus::Succeeded);
932
933 let third_result = poller.next().await;
935 assert!(third_result.is_none());
936
937 assert_eq!(*call_count.lock().unwrap(), 2);
939 }
940
941 #[tokio::test]
942 async fn poller_failed() {
943 let call_count = Arc::new(Mutex::new(0));
944
945 let mock_client = {
946 let call_count = call_count.clone();
947 Arc::new(MockHttpClient::new(move |_| {
948 let call_count = call_count.clone();
949 async move {
950 let mut count = call_count.lock().unwrap();
951 *count += 1;
952
953 if *count == 1 {
954 Ok(AsyncRawResponse::from_bytes(
956 StatusCode::Created,
957 Headers::new(),
958 br#"{"status":"InProgress"}"#.to_vec(),
959 ))
960 } else {
961 Ok(AsyncRawResponse::from_bytes(
963 StatusCode::Ok,
964 Headers::new(),
965 br#"{"status":"Failed"}"#.to_vec(),
966 ))
967 }
968 }
969 .boxed()
970 }))
971 };
972 let mut poller = Poller::new(
973 move |_, _| {
974 let client = mock_client.clone();
975 Box::pin(async move {
976 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
977 let raw_response = client
978 .execute_request(&req)
979 .await?
980 .try_into_raw_response()
981 .await?;
982 let (status, headers, body) = raw_response.deconstruct();
983
984 let test_status: TestStatus = crate::json::from_json(&body)?;
985 let response: Response<TestStatus> =
986 RawResponse::from_bytes(status, headers, body).into();
987
988 match test_status.status() {
989 PollerStatus::InProgress => Ok(PollerResult::InProgress {
990 response,
991 retry_after: Duration::ZERO,
992 continuation: PollerContinuation::Links {
993 next_link: req.url().clone(),
994 final_link: None,
995 },
996 }),
997 _ => Ok(PollerResult::Done { response }),
998 }
999 })
1000 },
1001 None,
1002 );
1003
1004 let first_result = poller.next().await;
1006 assert!(first_result.is_some());
1007 let first_response = first_result.unwrap().unwrap();
1008 assert_eq!(first_response.status(), StatusCode::Created);
1009 let first_body = first_response.into_model().unwrap();
1010 assert_eq!(first_body.status(), PollerStatus::InProgress);
1011
1012 let second_result = poller.next().await;
1014 assert!(second_result.is_some());
1015 let second_response = second_result.unwrap().unwrap();
1016 assert_eq!(second_response.status(), StatusCode::Ok);
1017 let second_body = second_response.into_model().unwrap();
1018 assert_eq!(second_body.status(), PollerStatus::Failed);
1019
1020 let third_result = poller.next().await;
1022 assert!(third_result.is_none());
1023
1024 assert_eq!(*call_count.lock().unwrap(), 2);
1026 }
1027
1028 #[tokio::test]
1029 async fn poller_failed_with_http_429() {
1030 let call_count = Arc::new(Mutex::new(0));
1031
1032 let mock_client = {
1033 let call_count = call_count.clone();
1034 Arc::new(MockHttpClient::new(move |_| {
1035 let call_count = call_count.clone();
1036 async move {
1037 let mut count = call_count.lock().unwrap();
1038 *count += 1;
1039
1040 if *count == 1 {
1041 Ok(AsyncRawResponse::from_bytes(
1043 StatusCode::Ok,
1044 Headers::new(),
1045 br#"{"status":"InProgress"}"#.to_vec(),
1046 ))
1047 } else {
1048 Ok(AsyncRawResponse::from_bytes(
1050 StatusCode::TooManyRequests,
1051 Headers::new(),
1052 vec![],
1053 ))
1054 }
1055 }
1056 .boxed()
1057 }))
1058 };
1059
1060 let mut poller = Poller::new(
1061 move |_, _| {
1062 let client = mock_client.clone();
1063 Box::pin(async move {
1064 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1065 let raw_response = client
1066 .execute_request(&req)
1067 .await?
1068 .try_into_raw_response()
1069 .await?;
1070 let (status, headers, body) = raw_response.deconstruct();
1071
1072 if status == StatusCode::Ok {
1073 let test_status: TestStatus = crate::json::from_json(&body)?;
1074 let response: Response<TestStatus> =
1075 RawResponse::from_bytes(status, headers, body).into();
1076
1077 match test_status.status() {
1078 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1079 response,
1080 retry_after: Duration::ZERO,
1081 continuation: PollerContinuation::Links {
1082 next_link: req.url().clone(),
1083 final_link: None,
1084 },
1085 }),
1086 _ => Ok(PollerResult::Done { response }),
1087 }
1088 } else {
1089 let response: Response<TestStatus> =
1091 RawResponse::from_bytes(status, headers, body).into();
1092 Ok(PollerResult::Done { response })
1093 }
1094 })
1095 },
1096 None,
1097 );
1098
1099 let first_result = poller.next().await;
1101 assert!(first_result.is_some());
1102 assert!(first_result.unwrap().is_ok());
1103
1104 let second_result = poller.next().await;
1106 assert!(second_result.is_some());
1107 let error = second_result.unwrap().unwrap_err();
1108
1109 match error.kind() {
1111 ErrorKind::HttpResponse { status, .. } => {
1112 assert_eq!(*status, StatusCode::TooManyRequests);
1113 }
1114 _ => panic!("Expected HttpResponse error, got {:?}", error.kind()),
1115 }
1116
1117 assert_eq!(*call_count.lock().unwrap(), 2);
1119 }
1120
1121 #[tokio::test]
1122 async fn poller_into_future_succeeds() {
1123 let call_count = Arc::new(Mutex::new(0));
1124
1125 let mock_client = {
1126 let call_count = call_count.clone();
1127 Arc::new(MockHttpClient::new(move |_| {
1128 let call_count = call_count.clone();
1129 async move {
1130 let mut count = call_count.lock().unwrap();
1131 *count += 1;
1132
1133 if *count == 1 {
1134 Ok(AsyncRawResponse::from_bytes(
1136 StatusCode::Created,
1137 Headers::new(),
1138 br#"{"status":"InProgress"}"#.to_vec(),
1139 ))
1140 } else {
1141 Ok(AsyncRawResponse::from_bytes(
1143 StatusCode::Ok,
1144 Headers::new(),
1145 br#"{"status":"Succeeded","id":"op1","name":"Operation completed successfully"}"#.to_vec(),
1146 ))
1147 }
1148 }
1149 .boxed()
1150 }))
1151 };
1152
1153 let poller = Poller::new(
1154 move |_, _| {
1155 let client = mock_client.clone();
1156 Box::pin(async move {
1157 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1158 let raw_response = client.execute_request(&req).await?;
1159 let (status, headers, body) = raw_response.deconstruct();
1160 let bytes = body.collect().await?;
1161
1162 let test_status: TestStatus = crate::json::from_json(&bytes)?;
1163 let response: Response<TestStatus> =
1164 RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1165
1166 match test_status.status() {
1167 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1168 response,
1169 retry_after: Duration::ZERO,
1170 continuation: PollerContinuation::Links {
1171 next_link: req.url().clone(),
1172 final_link: None,
1173 },
1174 }),
1175 PollerStatus::Succeeded => {
1176 Ok(PollerResult::Succeeded {
1178 response,
1179 target: Box::new(|| {
1180 Box::pin(async {
1181 use crate::http::headers::Headers;
1184 let headers = Headers::new();
1185 let bytes = bytes::Bytes::from(
1186 r#"{"id": "op1", "name": "Operation completed successfully"}"#,
1187 );
1188 Ok(RawResponse::from_bytes(StatusCode::Ok, headers, bytes)
1189 .into())
1190 })
1191 }),
1192 })
1193 }
1194 _ => Ok(PollerResult::Done { response }),
1195 }
1196 })
1197 },
1198 None,
1199 );
1200
1201 let result = poller.await;
1203 assert!(result.is_ok());
1204 let response = result.unwrap();
1205 assert_eq!(response.status(), StatusCode::Ok);
1206 let output = response.into_model().unwrap();
1207 assert_eq!(output.id.as_deref(), Some("op1"));
1208 assert_eq!(
1209 output.name.as_deref(),
1210 Some("Operation completed successfully")
1211 );
1212
1213 assert_eq!(*call_count.lock().unwrap(), 2);
1215 }
1216
1217 #[tokio::test]
1218 async fn poller_into_future_with_target_url() {
1219 let call_count = Arc::new(Mutex::new(0));
1220
1221 let mock_client = {
1222 let call_count = call_count.clone();
1223 Arc::new(MockHttpClient::new(move |req: &Request| {
1224 let call_count = call_count.clone();
1225 let url = req.url().to_string();
1226 async move {
1227 let mut count = call_count.lock().unwrap();
1228 *count += 1;
1229
1230 if *count == 1 {
1231 Ok(AsyncRawResponse::from_bytes(
1233 StatusCode::Accepted,
1234 Headers::new(),
1235 br#"{"status":"InProgress"}"#.to_vec(),
1236 ))
1237 } else if *count == 2 {
1238 Ok(AsyncRawResponse::from_bytes(
1240 StatusCode::Ok,
1241 Headers::new(),
1242 br#"{"status":"Succeeded","target":"https://example.com/resources/123"}"#.to_vec(),
1243 ))
1244 } else {
1245 assert_eq!(url, "https://example.com/resources/123");
1247 Ok(AsyncRawResponse::from_bytes(
1248 StatusCode::Ok,
1249 Headers::new(),
1250 br#"{"id":"123","name":"Test Resource"}"#.to_vec(),
1251 ))
1252 }
1253 }
1254 .boxed()
1255 }))
1256 };
1257
1258 let poller = Poller::new(
1259 move |_, _| {
1260 let client = mock_client.clone();
1261 Box::pin(async move {
1262 let req = Request::new(
1263 "https://example.com/operations/op1".parse().unwrap(),
1264 Method::Get,
1265 );
1266 let raw_response = client.execute_request(&req).await?;
1267 let (status, headers, body) = raw_response.deconstruct();
1268 let bytes = body.collect().await?;
1269
1270 let operation_status: TestStatus = crate::json::from_json(&bytes)?;
1271 let response: Response<TestStatus> =
1272 RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1273
1274 match operation_status.status() {
1275 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1276 response,
1277 retry_after: Duration::ZERO,
1278 continuation: PollerContinuation::Links {
1279 next_link: req.url().clone(),
1280 final_link: None,
1281 },
1282 }),
1283 PollerStatus::Succeeded => {
1284 if let Some(target_url) = operation_status.target {
1286 let client_clone = client.clone();
1287 Ok(PollerResult::Succeeded {
1288 response,
1289 target: Box::new(move || {
1290 Box::pin(async move {
1291 let target_req = Request::new(
1292 target_url.parse().unwrap(),
1293 Method::Get,
1294 );
1295 let target_response =
1296 client_clone.execute_request(&target_req).await?;
1297 let (target_status, target_headers, target_body) =
1298 target_response.deconstruct();
1299 let target_bytes = target_body.collect().await?;
1300
1301 Ok(RawResponse::from_bytes(
1302 target_status,
1303 target_headers,
1304 target_bytes,
1305 )
1306 .into())
1307 })
1308 }),
1309 })
1310 } else {
1311 Err(crate::Error::new(
1312 ErrorKind::Other,
1313 "no target URL in succeeded response",
1314 ))
1315 }
1316 }
1317 _ => Ok(PollerResult::Done { response }),
1318 }
1319 })
1320 },
1321 None,
1322 );
1323
1324 let result = poller.await;
1326 assert!(result.is_ok());
1327 let response = result.unwrap();
1328 assert_eq!(response.status(), StatusCode::Ok);
1329 let resource = response.into_model().unwrap();
1330 assert_eq!(resource.id.as_deref(), Some("123"));
1331 assert_eq!(resource.name.as_deref(), Some("Test Resource"));
1332
1333 assert_eq!(*call_count.lock().unwrap(), 3);
1335 }
1336
1337 #[tokio::test]
1338 async fn poller_into_future_no_response_body() {
1339 #[derive(Debug, serde::Deserialize)]
1340 struct NoBodyStatus {
1341 status: String,
1342 }
1343
1344 impl StatusMonitor for NoBodyStatus {
1345 type Output = ();
1346 type Format = NoFormat;
1347
1348 fn status(&self) -> PollerStatus {
1349 self.status.parse().unwrap_or_default()
1350 }
1351 }
1352
1353 let call_count = Arc::new(Mutex::new(0));
1354
1355 let mock_client = {
1356 let call_count = call_count.clone();
1357 Arc::new(MockHttpClient::new(move |_| {
1358 let call_count = call_count.clone();
1359 async move {
1360 let mut count = call_count.lock().unwrap();
1361 *count += 1;
1362
1363 if *count == 1 {
1364 Ok(AsyncRawResponse::from_bytes(
1366 StatusCode::Accepted,
1367 Headers::new(),
1368 br#"{"status":"InProgress"}"#.to_vec(),
1369 ))
1370 } else {
1371 Ok(AsyncRawResponse::from_bytes(
1373 StatusCode::Ok,
1374 Headers::new(),
1375 br#"{"status":"Succeeded"}"#.to_vec(),
1376 ))
1377 }
1378 }
1379 .boxed()
1380 }))
1381 };
1382
1383 let poller = Poller::new(
1384 move |_, _| {
1385 let client = mock_client.clone();
1386 Box::pin(async move {
1387 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1388 let raw_response = client.execute_request(&req).await?;
1389 let (status, headers, body) = raw_response.deconstruct();
1390 let bytes = body.collect().await?;
1391
1392 let no_body_status: NoBodyStatus = crate::json::from_json(&bytes)?;
1393 let response: Response<NoBodyStatus> =
1394 RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1395
1396 match no_body_status.status() {
1397 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1398 response,
1399 retry_after: Duration::ZERO,
1400 continuation: PollerContinuation::Links {
1401 next_link: req.url().clone(),
1402 final_link: None,
1403 },
1404 }),
1405 PollerStatus::Succeeded => {
1406 Ok(PollerResult::Succeeded {
1408 response,
1409 target: Box::new(move || {
1410 Box::pin(async move {
1411 use crate::http::headers::Headers;
1413 let headers = Headers::new();
1414 Ok(RawResponse::from_bytes(status, headers, Vec::new())
1415 .into())
1416 })
1417 }),
1418 })
1419 }
1420 _ => Ok(PollerResult::Done { response }),
1421 }
1422 })
1423 },
1424 None,
1425 );
1426
1427 let result = poller.await;
1429 assert!(result.is_ok());
1430 let response = result.unwrap();
1431 assert_eq!(response.status(), StatusCode::Ok);
1432 assert_eq!(*call_count.lock().unwrap(), 2);
1437 }
1438
1439 #[cfg(feature = "xml")]
1440 #[tokio::test]
1441 async fn poller_succeeded_xml() {
1442 let call_count = Arc::new(Mutex::new(0));
1443
1444 let mock_client = {
1445 let call_count = call_count.clone();
1446 Arc::new(MockHttpClient::new(move |_| {
1447 let call_count = call_count.clone();
1448 async move {
1449 let mut count = call_count.lock().unwrap();
1450 *count += 1;
1451
1452 if *count == 1 {
1453 Ok(AsyncRawResponse::from_bytes(
1455 StatusCode::Created,
1456 Headers::new(),
1457 b"<XmlTestStatus><status>InProgress</status></XmlTestStatus>".to_vec(),
1458 ))
1459 } else {
1460 Ok(AsyncRawResponse::from_bytes(
1462 StatusCode::Ok,
1463 Headers::new(),
1464 b"<XmlTestStatus><status>Succeeded</status></XmlTestStatus>".to_vec(),
1465 ))
1466 }
1467 }
1468 .boxed()
1469 }))
1470 };
1471
1472 let mut poller = Poller::new(
1473 move |_, _| {
1474 let client = mock_client.clone();
1475 Box::pin(async move {
1476 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1477 let raw_response = client.execute_request(&req).await?;
1478 let (status, headers, body) = raw_response.deconstruct();
1479 let bytes = body.collect().await?;
1480
1481 let test_status: XmlTestStatus = crate::xml::from_xml(&bytes)?;
1482 let response: Response<XmlTestStatus, XmlFormat> =
1483 RawResponse::from_bytes(status, headers, bytes).into();
1484
1485 match test_status.status() {
1486 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1487 response,
1488 retry_after: Duration::ZERO,
1489 continuation: PollerContinuation::Links {
1490 next_link: req.url().clone(),
1491 final_link: None,
1492 },
1493 }),
1494 _ => Ok(PollerResult::Done { response }),
1495 }
1496 })
1497 },
1498 None,
1499 );
1500
1501 let first_result = poller.next().await;
1503 assert!(first_result.is_some());
1504 let first_response = first_result.unwrap().unwrap();
1505 assert_eq!(first_response.status(), StatusCode::Created);
1506 let first_body = first_response.into_model().unwrap();
1507 assert_eq!(first_body.status(), PollerStatus::InProgress);
1508
1509 let second_result = poller.next().await;
1511 assert!(second_result.is_some());
1512 let second_response = second_result.unwrap().unwrap();
1513 assert_eq!(second_response.status(), StatusCode::Ok);
1514 let second_body = second_response.into_model().unwrap();
1515 assert_eq!(second_body.status(), PollerStatus::Succeeded);
1516
1517 let third_result = poller.next().await;
1519 assert!(third_result.is_none());
1520
1521 assert_eq!(*call_count.lock().unwrap(), 2);
1523 }
1524
1525 #[cfg(feature = "xml")]
1526 #[tokio::test]
1527 async fn poller_into_future_succeeds_xml() {
1528 let call_count = Arc::new(Mutex::new(0));
1529
1530 let mock_client = {
1531 let call_count = call_count.clone();
1532 Arc::new(MockHttpClient::new(move |_| {
1533 let call_count = call_count.clone();
1534 async move {
1535 let mut count = call_count.lock().unwrap();
1536 *count += 1;
1537
1538 if *count == 1 {
1539 Ok(AsyncRawResponse::from_bytes(
1541 StatusCode::Created,
1542 Headers::new(),
1543 b"<XmlTestStatus><status>InProgress</status></XmlTestStatus>"
1544 .to_vec(),
1545 ))
1546 } else {
1547 Ok(AsyncRawResponse::from_bytes(
1550 StatusCode::Ok,
1551 Headers::new(),
1552 b"<XmlTestStatus><status>Succeeded</status><id>op1</id><name>Operation completed successfully</name></XmlTestStatus>"
1553 .to_vec(),
1554 ))
1555 }
1556 }
1557 .boxed()
1558 }))
1559 };
1560
1561 let poller = Poller::new(
1562 move |_, _| {
1563 let client = mock_client.clone();
1564 Box::pin(async move {
1565 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1566 let raw_response = client.execute_request(&req).await?;
1567 let (status, headers, body) = raw_response.deconstruct();
1568 let bytes = body.collect().await?;
1569
1570 let test_status: XmlTestStatus = crate::xml::from_xml(&bytes)?;
1571 let response: Response<XmlTestStatus, XmlFormat> =
1572 RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1573
1574 match test_status.status() {
1575 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1576 response,
1577 retry_after: Duration::ZERO,
1578 continuation: PollerContinuation::Links {
1579 next_link: req.url().clone(),
1580 final_link: None,
1581 },
1582 }),
1583 PollerStatus::Succeeded => {
1584 Ok(PollerResult::Succeeded {
1586 response,
1587 target: Box::new(move || {
1588 Box::pin(async move {
1589 let headers = Headers::new();
1591 let bytes = bytes::Bytes::from(
1592 r#"<TestOutput><id>op1</id><name>Operation completed successfully</name></TestOutput>"#,
1593 );
1594 Ok(RawResponse::from_bytes(StatusCode::Ok, headers, bytes)
1595 .into())
1596 })
1597 }),
1598 })
1599 }
1600 _ => Ok(PollerResult::Done { response }),
1601 }
1602 })
1603 },
1604 None,
1605 );
1606
1607 let result = poller.await;
1609 assert!(result.is_ok());
1610 let response = result.unwrap();
1611 assert_eq!(response.status(), StatusCode::Ok);
1612 let output = response.into_model().unwrap();
1613 assert_eq!(output.id.as_deref(), Some("op1"));
1614 assert_eq!(
1615 output.name.as_deref(),
1616 Some("Operation completed successfully")
1617 );
1618
1619 assert_eq!(*call_count.lock().unwrap(), 2);
1621 }
1622
1623 #[tokio::test]
1624 async fn poller_into_future_output_is_self() {
1625 #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1627 struct SelfContainedStatus {
1628 status: String,
1629 id: Option<String>,
1630 result: Option<String>,
1631 }
1632
1633 impl StatusMonitor for SelfContainedStatus {
1634 type Output = Self; type Format = JsonFormat;
1636
1637 fn status(&self) -> PollerStatus {
1638 self.status.parse().unwrap_or_default()
1639 }
1640 }
1641
1642 let call_count = Arc::new(Mutex::new(0));
1643
1644 let mock_client = {
1645 let call_count = call_count.clone();
1646 Arc::new(MockHttpClient::new(move |_| {
1647 let call_count = call_count.clone();
1648 async move {
1649 let mut count = call_count.lock().unwrap();
1650 *count += 1;
1651
1652 if *count == 1 {
1653 Ok(AsyncRawResponse::from_bytes(
1655 StatusCode::Created,
1656 Headers::new(),
1657 br#"{"status":"InProgress","id":"op1"}"#.to_vec(),
1658 ))
1659 } else {
1660 Ok(AsyncRawResponse::from_bytes(
1662 StatusCode::Ok,
1663 Headers::new(),
1664 br#"{"status":"Succeeded","id":"op1","result":"Operation completed successfully"}"#.to_vec(),
1665 ))
1666 }
1667 }
1668 .boxed()
1669 }))
1670 };
1671
1672 let poller = Poller::new(
1673 move |_, _| {
1674 let client = mock_client.clone();
1675 Box::pin(async move {
1676 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1677 let raw_response = client.execute_request(&req).await?;
1678 let (status, headers, body) = raw_response.deconstruct();
1679 let bytes = body.collect().await?;
1680
1681 let self_status: SelfContainedStatus = crate::json::from_json(&bytes)?;
1682 let response: Response<SelfContainedStatus> =
1683 RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1684
1685 match self_status.status() {
1686 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1687 response,
1688 retry_after: Duration::ZERO,
1689 continuation: PollerContinuation::Links {
1690 next_link: req.url().clone(),
1691 final_link: None,
1692 },
1693 }),
1694 PollerStatus::Succeeded => {
1695 let final_bytes = bytes.clone();
1698 Ok(PollerResult::Succeeded {
1699 response,
1700 target: Box::new(move || {
1701 Box::pin(async move {
1702 let headers = Headers::new();
1704 Ok(RawResponse::from_bytes(
1705 StatusCode::Ok,
1706 headers,
1707 final_bytes,
1708 )
1709 .into())
1710 })
1711 }),
1712 })
1713 }
1714 _ => Ok(PollerResult::Done { response }),
1715 }
1716 })
1717 },
1718 None,
1719 );
1720
1721 let result = poller.await;
1723 assert!(result.is_ok());
1724 let response = result.unwrap();
1725 assert_eq!(response.status(), StatusCode::Ok);
1726 let output = response.into_model().unwrap();
1727 assert_eq!(output.id.as_deref(), Some("op1"));
1728 assert_eq!(
1729 output.result.as_deref(),
1730 Some("Operation completed successfully")
1731 );
1732
1733 assert_eq!(*call_count.lock().unwrap(), 2);
1735 }
1736
1737 #[tokio::test]
1738 async fn poller_stream_output_is_self() {
1739 #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1742 struct SelfContainedStatus {
1743 status: String,
1744 id: Option<String>,
1745 result: Option<String>,
1746 }
1747
1748 impl StatusMonitor for SelfContainedStatus {
1749 type Output = Self; type Format = JsonFormat;
1751
1752 fn status(&self) -> PollerStatus {
1753 self.status.parse().unwrap_or_default()
1754 }
1755 }
1756
1757 let call_count = Arc::new(Mutex::new(0));
1758
1759 let mock_client = {
1760 let call_count = call_count.clone();
1761 Arc::new(MockHttpClient::new(move |_| {
1762 let call_count = call_count.clone();
1763 async move {
1764 let mut count = call_count.lock().unwrap();
1765 *count += 1;
1766
1767 if *count == 1 {
1768 Ok(AsyncRawResponse::from_bytes(
1770 StatusCode::Created,
1771 Headers::new(),
1772 br#"{"status":"InProgress","id":"op1"}"#.to_vec(),
1773 ))
1774 } else {
1775 Ok(AsyncRawResponse::from_bytes(
1777 StatusCode::Ok,
1778 Headers::new(),
1779 br#"{"status":"Succeeded","id":"op1","result":"Operation completed successfully"}"#.to_vec(),
1780 ))
1781 }
1782 }
1783 .boxed()
1784 }))
1785 };
1786
1787 let mut poller = Poller::new(
1788 move |_, _| {
1789 let client = mock_client.clone();
1790 Box::pin(async move {
1791 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1792 let raw_response = client.execute_request(&req).await?;
1793 let (status, headers, body) = raw_response.deconstruct();
1794 let bytes = body.collect().await?;
1795
1796 let self_status: SelfContainedStatus = crate::json::from_json(&bytes)?;
1797 let response: Response<SelfContainedStatus> =
1798 RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1799
1800 match self_status.status() {
1801 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1802 response,
1803 retry_after: Duration::ZERO,
1804 continuation: PollerContinuation::Links {
1805 next_link: req.url().clone(),
1806 final_link: None,
1807 },
1808 }),
1809 PollerStatus::Succeeded => {
1810 let final_bytes = bytes.clone();
1812 Ok(PollerResult::Succeeded {
1813 response,
1814 target: Box::new(move || {
1815 Box::pin(async move {
1816 use crate::http::headers::Headers;
1817 let headers = Headers::new();
1818 Ok(RawResponse::from_bytes(
1819 StatusCode::Ok,
1820 headers,
1821 final_bytes,
1822 )
1823 .into())
1824 })
1825 }),
1826 })
1827 }
1828 _ => Ok(PollerResult::Done { response }),
1829 }
1830 })
1831 },
1832 None,
1833 );
1834
1835 let mut statuses = Vec::new();
1837 while let Some(status_response) = poller.try_next().await.unwrap() {
1838 let status = status_response.into_model().unwrap();
1839 statuses.push(status);
1840 }
1841
1842 assert_eq!(statuses.len(), 2);
1844 assert_eq!(statuses[0].status, "InProgress");
1845 assert_eq!(statuses[0].id.as_deref(), Some("op1"));
1846 assert_eq!(statuses[0].result, None);
1847
1848 assert_eq!(statuses[1].status, "Succeeded");
1849 assert_eq!(statuses[1].id.as_deref(), Some("op1"));
1850 assert_eq!(
1851 statuses[1].result.as_deref(),
1852 Some("Operation completed successfully")
1853 );
1854
1855 assert_eq!(*call_count.lock().unwrap(), 2);
1857 }
1858
1859 #[tokio::test]
1860 async fn into_future_output_model_and_raw_response() {
1861 let call_count = Arc::new(Mutex::new(0));
1862 let final_json = br#"{"id":"res1","name":"My Resource"}"#;
1863
1864 let mock_client = {
1865 let call_count = call_count.clone();
1866 Arc::new(MockHttpClient::new(move |_| {
1867 let call_count = call_count.clone();
1868 async move {
1869 let mut count = call_count.lock().unwrap();
1870 *count += 1;
1871
1872 if *count == 1 {
1873 Ok(AsyncRawResponse::from_bytes(
1874 StatusCode::Accepted,
1875 Headers::new(),
1876 br#"{"status":"InProgress"}"#.to_vec(),
1877 ))
1878 } else {
1879 Ok(AsyncRawResponse::from_bytes(
1880 StatusCode::Ok,
1881 Headers::new(),
1882 br#"{"status":"Succeeded"}"#.to_vec(),
1883 ))
1884 }
1885 }
1886 .boxed()
1887 }))
1888 };
1889
1890 let poller = Poller::new(
1891 move |_, _| {
1892 let client = mock_client.clone();
1893 Box::pin(async move {
1894 let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1895 let raw_response = client.execute_request(&req).await?;
1896 let (status, headers, body) = raw_response.deconstruct();
1897 let bytes = body.collect().await?;
1898
1899 let test_status: TestStatus = crate::json::from_json(&bytes)?;
1900 let response: Response<TestStatus> =
1901 RawResponse::from_bytes(status, headers.clone(), bytes).into();
1902
1903 match test_status.status() {
1904 PollerStatus::InProgress => Ok(PollerResult::InProgress {
1905 response,
1906 retry_after: Duration::ZERO,
1907 continuation: PollerContinuation::Links {
1908 next_link: req.url().clone(),
1909 final_link: None,
1910 },
1911 }),
1912 PollerStatus::Succeeded => {
1913 let target_body =
1914 bytes::Bytes::from_static(br#"{"id":"res1","name":"My Resource"}"#);
1915 Ok(PollerResult::Succeeded {
1916 response,
1917 target: Box::new(move || {
1918 Box::pin(async move {
1919 Ok(RawResponse::from_bytes(
1920 StatusCode::Ok,
1921 Headers::new(),
1922 target_body,
1923 )
1924 .into())
1925 })
1926 }),
1927 })
1928 }
1929 _ => Ok(PollerResult::Done { response }),
1930 }
1931 })
1932 },
1933 None,
1934 );
1935
1936 let response = poller.await.unwrap();
1938 assert_eq!(response.status(), StatusCode::Ok);
1939
1940 let raw = response.to_raw_response();
1942
1943 let output = response.into_model().unwrap();
1945 assert_eq!(output.id.as_deref(), Some("res1"));
1946 assert_eq!(output.name.as_deref(), Some("My Resource"));
1947
1948 let (raw_status, _, raw_body) = raw.deconstruct();
1950 assert_eq!(raw_status, StatusCode::Ok);
1951 assert_eq!(raw_body.as_ref(), final_json);
1952
1953 assert_eq!(*call_count.lock().unwrap(), 2);
1954 }
1955}