Skip to main content

azure_core/http/
poller.rs

1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT License.
3
4//! Types and methods for long-running operations (LROs).
5
6use crate::{
7    error::{ErrorKind, ErrorResponse},
8    http::{
9        headers::{HeaderName, Headers},
10        policies::create_public_api_span,
11        Context, Format, JsonFormat, Response, StatusCode, Url,
12    },
13    sleep,
14    time::{Duration, OffsetDateTime},
15    tracing::{Span, SpanStatus},
16};
17use futures::{channel::oneshot, stream::unfold, Stream, StreamExt};
18use serde::Deserialize;
19use std::{
20    convert::Infallible,
21    fmt,
22    future::{Future, IntoFuture},
23    pin::Pin,
24    str::FromStr,
25    sync::Arc,
26    task::{Context as TaskContext, Poll},
27};
28
29/// Default retry time for long-running operations if no retry-after header is present
30///
31/// This value is the same as the default used in other Azure SDKs e.g.,
32/// <https://github.com/Azure/azure-sdk-for-python/blob/azure-core_1.35.0/sdk/core/azure-core/azure/core/polling/base_polling.py#L586>
33const DEFAULT_RETRY_TIME: Duration = Duration::seconds(30);
34
35/// Minimum retry time for long-running operations
36const MIN_RETRY_TIME: Duration = Duration::seconds(1);
37
38/// Represents the state of a [`Poller`].
39#[derive(Debug, Clone, Default, PartialEq, Eq)]
40pub enum PollerState {
41    /// The poller should fetch the initial status.
42    #[default]
43    Initial,
44    /// The poller should fetch subsequent status.
45    More(PollerContinuation),
46}
47
48/// Long-running operation (LRO) status.
49#[derive(Debug, Default, Clone, PartialEq, Eq)]
50pub enum PollerStatus {
51    /// The LRO is still in progress.
52    #[default]
53    InProgress,
54
55    /// The LRO completed successfully.
56    Succeeded,
57
58    /// The LRO failed.
59    Failed,
60
61    /// The LRO was canceled.
62    Canceled,
63
64    /// Another status not otherwise defined.
65    UnknownValue(String),
66}
67
68impl From<&str> for PollerStatus {
69    fn from(value: &str) -> Self {
70        // LRO status should be compared case-insensitively:
71        // https://github.com/Azure/azure-sdk-for-rust/issues/2482
72
73        // cspell:words inprogress
74        if "inprogress".eq_ignore_ascii_case(value) {
75            return PollerStatus::InProgress;
76        }
77
78        if "succeeded".eq_ignore_ascii_case(value) {
79            return PollerStatus::Succeeded;
80        }
81
82        if "failed".eq_ignore_ascii_case(value) {
83            return PollerStatus::Failed;
84        }
85
86        // While the specification recommends "Canceled", in practice
87        // numerous services use "Cancelled".
88        if "canceled".eq_ignore_ascii_case(value) || "cancelled".eq_ignore_ascii_case(value) {
89            return PollerStatus::Canceled;
90        }
91
92        PollerStatus::UnknownValue(value.to_owned())
93    }
94}
95
96impl FromStr for PollerStatus {
97    type Err = Infallible;
98    fn from_str(value: &str) -> Result<Self, Self::Err> {
99        Ok(value.into())
100    }
101}
102
103impl<'de> Deserialize<'de> for PollerStatus {
104    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
105    where
106        D: serde::Deserializer<'de>,
107    {
108        struct PollerStatusVisitor;
109        impl serde::de::Visitor<'_> for PollerStatusVisitor {
110            type Value = PollerStatus;
111
112            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
113                formatter.write_str("a string representing a PollerStatus")
114            }
115
116            fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
117            where
118                E: serde::de::Error,
119            {
120                FromStr::from_str(s).map_err(serde::de::Error::custom)
121            }
122        }
123
124        deserializer.deserialize_str(PollerStatusVisitor)
125    }
126}
127
128/// Options to create the [`Poller`].
129#[derive(Debug, Clone)]
130pub struct PollerOptions<'a> {
131    /// Allows customization of the method call.
132    pub context: Context<'a>,
133    /// The time to wait between polling intervals in absence of a `retry-after` header.
134    ///
135    /// The default is 30 seconds. The minimum time enforced by [`Poller::new`] is 1 second.
136    pub frequency: Duration,
137}
138
139impl Default for PollerOptions<'_> {
140    fn default() -> Self {
141        Self {
142            frequency: DEFAULT_RETRY_TIME,
143            context: Context::new(),
144        }
145    }
146}
147
148impl<'a> PollerOptions<'a> {
149    /// Converts these poller options into an owned form so they can outlive the current scope.
150    #[must_use]
151    pub fn into_owned(self) -> PollerOptions<'static> {
152        PollerOptions {
153            context: self.context.into_owned(),
154            frequency: self.frequency,
155        }
156    }
157}
158
159/// The result of fetching the status monitor from a [`Poller`], whether the long-running operation (LRO) is in progress or done.
160pub enum PollerResult<M, F = JsonFormat>
161where
162    M: StatusMonitor,
163    F: Format,
164{
165    /// The long-running operation (LRO) is in progress and the next status monitor update may be fetched from `continuation`.
166    ///
167    /// # Fields
168    ///
169    /// * `response` contains the HTTP response with the status monitor.
170    /// * `retry_after` is the optional client-specified [`Duration`] to wait. The default is 30 seconds.
171    /// * `continuation` is the next link / continuation token.
172    InProgress {
173        /// The HTTP response with the status monitor.
174        response: Response<M, F>,
175        /// The optional client-specified [`Duration`] to wait before polling again.
176        retry_after: Duration,
177        /// The next link / continuation token.
178        continuation: PollerContinuation,
179    },
180
181    /// The long-running operation (LRO) succeeded and contains the final output.
182    ///
183    /// # Fields
184    ///
185    /// * `response` contains the HTTP response with the status monitor in a terminal state.
186    Done {
187        /// The HTTP response with the status monitor in a terminal state.
188        response: Response<M, F>,
189    },
190
191    /// The long-running operation (LRO) succeeded and contains the final status.
192    ///
193    /// # Fields
194    ///
195    /// * `response` contains the HTTP response with the final status monitor.
196    /// * `get_target` is an async function that fetches the final output.
197    Succeeded {
198        /// The HTTP response with the final status monitor.
199        response: Response<M, F>,
200        /// An async function that fetches the final output.
201        target: BoxedCallback<M>,
202    },
203}
204
205impl<M, F> fmt::Debug for PollerResult<M, F>
206where
207    M: StatusMonitor,
208    F: Format,
209{
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        match self {
212            Self::InProgress {
213                retry_after,
214                continuation,
215                ..
216            } => f
217                .debug_struct("InProgress")
218                .field("retry_after", &retry_after)
219                .field("continuation", &continuation)
220                .finish_non_exhaustive(),
221            Self::Done { .. } => f.debug_struct("Done").finish_non_exhaustive(),
222            Self::Succeeded { .. } => f.debug_struct("Succeeded").finish_non_exhaustive(),
223        }
224    }
225}
226
227/// Information returned by the server to poll subsequent status.
228#[derive(Clone, Debug, PartialEq, Eq)]
229#[non_exhaustive]
230pub enum PollerContinuation {
231    /// Polling status or result is found at the following links.
232    Links {
233        /// A link to poll subsequent status.
234        next_link: Url,
235
236        /// A link to get the final result.
237        ///
238        /// May be `None` if the final result is contained in the status monitor.
239        final_link: Option<Url>,
240    },
241}
242
243impl fmt::Display for PollerContinuation {
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        match self {
246            PollerContinuation::Links { next_link, .. } => f.write_str(next_link.as_str()),
247        }
248    }
249}
250
251/// Represents a status monitor for a long-running operation (LRO).
252pub trait StatusMonitor {
253    /// The model type returned after the long-running operation (LRO) has completed successfully.
254    ///
255    /// Set this to the unit type `()` if no final resource is expected.
256    type Output;
257
258    /// The format used to deserialize the `Output`.
259    ///
260    /// Set this to [`NoFormat`](crate::http::NoFormat) if no final resource is expected.
261    type Format: Format + Send;
262
263    /// Gets the [`PollerStatus`] from the status monitor.
264    fn status(&self) -> PollerStatus;
265}
266
267mod types {
268    use super::{PollerResult, Response, StatusMonitor, Stream};
269    use std::{future::Future, pin::Pin};
270
271    pub type BoxedStream<M, F> = Box<dyn Stream<Item = crate::Result<Response<M, F>>> + Send>;
272    pub type BoxedFuture<M> = Box<
273        dyn Future<
274                Output = crate::Result<
275                    Response<<M as StatusMonitor>::Output, <M as StatusMonitor>::Format>,
276                >,
277            > + Send,
278    >;
279    pub type BoxedCallback<M> = Box<dyn FnOnce() -> Pin<BoxedFuture<M>> + Send>;
280
281    /// A pinned boxed [`Future`] that can be stored and called dynamically.
282    pub type PollerResultFuture<M, F> =
283        Pin<Box<dyn Future<Output = crate::Result<PollerResult<M, F>>> + Send + 'static>>;
284}
285
286pub use types::PollerResultFuture;
287use types::{BoxedCallback, BoxedFuture, BoxedStream};
288
289/// Represents a long-running operation (LRO)
290///
291/// A `Poller` implements both [`IntoFuture`] and [`Stream`].
292/// You can `await` a `Poller` to get the final model upon successful completion; or,
293/// you can call [`next`](StreamExt::next) or [`try_next`](futures::stream::TryStreamExt::try_next) on a mutable `Poller` to poll status manually.
294///
295/// # Examples
296///
297/// For clients that return a `Poller`, you can await it to get the final result:
298///
299/// ```no_run
300/// # use azure_core::credentials::TokenCredential;
301/// # use azure_security_keyvault_certificates::{CertificateClient, models::CreateCertificateParameters};
302/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
303/// # let credential: std::sync::Arc<dyn TokenCredential> = unimplemented!();
304/// let client = CertificateClient::new(
305///     "https://my-vault.vault.azure.net",
306///     credential.clone(),
307///     None,
308/// )?;
309///
310/// let params = CreateCertificateParameters::default();
311///
312/// // Await the poller to get the final certificate.
313/// let certificate = client
314///     .create_certificate("my-cert", params.try_into()?, None)?
315///     .await?
316///     .into_model()?;
317/// # Ok(()) }
318/// ```
319///
320/// If you want to manually poll status updates, you can use the `Poller` as a stream:
321///
322/// ```no_run
323/// # use azure_core::credentials::TokenCredential;
324/// # use azure_security_keyvault_certificates::{CertificateClient, models::CreateCertificateParameters};
325/// # use futures::TryStreamExt;
326/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327/// # let credential: std::sync::Arc<dyn TokenCredential> = unimplemented!();
328/// let client = CertificateClient::new(
329///     "https://my-vault.vault.azure.net",
330///     credential.clone(),
331///     None,
332/// )?;
333///
334/// let params = CreateCertificateParameters::default();
335///
336/// // Manually poll status updates.
337/// let mut poller = client
338///     .create_certificate("my-cert", params.try_into()?, None)?;
339///
340/// while let Some(status) = poller.try_next().await? {
341///     let status = status.into_model()?;
342///     println!("Status: {:?}", status.status);
343/// }
344///
345/// // After the stream ends, await to get the final certificate.
346/// let certificate = poller.await?.into_model()?;
347/// # Ok(()) }
348/// ```
349#[pin_project::pin_project]
350pub struct Poller<M, F = JsonFormat>
351where
352    M: StatusMonitor,
353    F: Format,
354{
355    #[pin]
356    stream: Pin<BoxedStream<M, F>>,
357    target: Option<BoxedFuture<M>>,
358}
359
360impl<M, F> Poller<M, F>
361where
362    M: StatusMonitor,
363    F: Format + Send,
364{
365    /// Creates a [`Poller`] from a callback that will be called repeatedly to monitor a long-running operation (LRO).
366    ///
367    /// This method expects a callback that accepts a single [`PollerState`] parameter, and returns a [`PollerResult`] value asynchronously.
368    /// The `N` type parameter is the type of the next link/continuation token. It may be any [`Send`]able type.
369    /// The `M` type parameter must implement [`StatusMonitor`].
370    ///
371    /// The stream will yield [`Response`] values for each intermediate response while the operation is in progress
372    /// i.e., while `M::status()` returns [`PollerStatus::InProgress`]. The stream ends when the operation completes
373    /// successfully, fails, or is canceled.
374    ///
375    /// # Panics
376    ///
377    /// Panics if [`PollerOptions::frequency`] is less than 1 second.
378    ///
379    /// # Examples
380    ///
381    /// To poll a long-running operation:
382    ///
383    /// ```rust,no_run
384    /// # use azure_core::{Result, json, http::{Context, JsonFormat, Pipeline, RawResponse, Request, Response, Method, Url, poller::{Poller, PollerContinuation, PollerResult, PollerState, PollerStatus, StatusMonitor}}};
385    /// # use serde::Deserialize;
386    /// # let api_version = "2025-06-04".to_string();
387    /// # let pipeline: Pipeline = panic!("Not a runnable example");
388    /// #[derive(Deserialize)]
389    /// struct OperationResult {
390    ///     id: String,
391    ///     status: Option<PollerStatus>,
392    ///     result: Option<String>,
393    /// }
394    ///
395    /// impl StatusMonitor for OperationResult {
396    ///     type Output = OperationResult;
397    ///     type Format = JsonFormat;
398    ///
399    ///     fn status(&self) -> PollerStatus {
400    ///         self.status.clone().unwrap_or_default()
401    ///     }
402    /// }
403    ///
404    /// let url = "https://example.com/my_operation".parse().unwrap();
405    /// let mut req = Request::new(url, Method::Post);
406    ///
407    /// let poller = Poller::new(move |operation_state: PollerState, poller_options| {
408    ///     // The callback must be 'static, so you have to clone and move any values you want to use.
409    ///     let pipeline = pipeline.clone();
410    ///     let api_version = api_version.clone();
411    ///     let mut req = req.clone();
412    ///     Box::pin(async move {
413    ///         if let PollerState::More(continuation) = operation_state {
414    ///             // Use the continuation to get the next URL for polling
415    ///             let next_link = match continuation {
416    ///                 PollerContinuation::Links { next_link, .. } => next_link,
417    ///                 _ => unreachable!(),
418    ///             };
419    ///             *req.url_mut() = next_link.clone();
420    ///             req.set_method(Method::Get);
421    ///         }
422    ///
423    ///         req.url_mut()
424    ///             .query_pairs_mut()
425    ///             .append_pair("api-version", &api_version);
426    ///
427    ///         let resp = pipeline
428    ///             .send(&poller_options.context, &mut req, None)
429    ///             .await?;
430    ///         let (status, headers, body) = resp.deconstruct();
431    ///         let result: OperationResult = json::from_json(&body)?;
432    ///         let final_body = body.clone(); // Clone before moving into Response
433    ///         let resp: Response<OperationResult> = RawResponse::from_bytes(status, headers, body).into();
434    ///
435    ///         match result.status() {
436    ///             PollerStatus::InProgress => {
437    ///                 // Continue polling with the operation URL from the response
438    ///                 let next_link = format!("https://example.com/operations/{}", result.id).parse()?;
439    ///                 Ok(PollerResult::InProgress {
440    ///                     response: resp,
441    ///                     retry_after: poller_options.frequency,
442    ///                     continuation: PollerContinuation::Links { next_link, final_link: None },
443    ///                 })
444    ///             }
445    ///             PollerStatus::Succeeded => {
446    ///                 // The result is in the operation response; otherwise, get the target URL
447    ///                 // from the response headers or body and asynchronously fetch the operation target.
448    ///                 Ok(PollerResult::Succeeded {
449    ///                     response: resp,
450    ///                     target: Box::new(move || {
451    ///                         Box::pin(async move {
452    ///                             // In this example, the final result is already in the status response
453    ///                             // In other cases, you might fetch from a target URL
454    ///                             use azure_core::http::headers::Headers;
455    ///                             Ok(RawResponse::from_bytes(status, Headers::new(), final_body).into())
456    ///                         })
457    ///                     }),
458    ///                 })
459    ///             }
460    ///             _ => Ok(PollerResult::Done { response: resp })
461    ///         }
462    ///     })
463    /// }, None);
464    /// ```
465    pub fn new<Fun>(make_request: Fun, options: Option<PollerOptions<'static>>) -> Self
466    where
467        M: Send + 'static,
468        M::Output: Send + 'static,
469        M::Format: Send + 'static,
470        Fun: Fn(PollerState, PollerOptions<'static>) -> PollerResultFuture<M, F> + Send + 'static,
471    {
472        let options = options.unwrap_or_default();
473        let (stream, target) = create_poller_stream(make_request, options);
474        Self {
475            stream: Box::pin(stream),
476            target: Some(target),
477        }
478    }
479}
480
481impl<M, F> Stream for Poller<M, F>
482where
483    M: StatusMonitor,
484    F: Format,
485{
486    type Item = crate::Result<Response<M, F>>;
487
488    fn poll_next(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
489        let state = self.project().stream.poll_next(cx);
490        if let Poll::Ready(Some(Ok(ref response))) = state {
491            check_status_code(response)?;
492        }
493
494        state
495    }
496}
497
498impl<M, F> IntoFuture for Poller<M, F>
499where
500    M: StatusMonitor + 'static,
501    M::Output: Send + 'static,
502    M::Format: Send + 'static,
503    F: Format + 'static,
504{
505    type Output = crate::Result<Response<M::Output, M::Format>>;
506
507    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
508
509    fn into_future(mut self) -> Self::IntoFuture {
510        Box::pin(async move {
511            // Poll the stream until completion
512            while let Some(result) = self.stream.next().await {
513                // Check if we got an error from the stream
514                result?;
515            }
516
517            // Extract the target future
518            let target = self.target.ok_or_else(|| {
519                crate::Error::new(
520                    ErrorKind::Other,
521                    "poller completed without a target response",
522                )
523            })?;
524
525            // Pin and await the target future to get the final response
526            Box::into_pin(target).await
527        })
528    }
529}
530
531impl<M, F> fmt::Debug for Poller<M, F>
532where
533    M: StatusMonitor,
534    F: Format,
535{
536    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537        f.write_str("Poller")
538    }
539}
540#[derive(Debug, Clone, PartialEq, Eq)]
541enum State {
542    Init,
543    InProgress(PollerContinuation),
544    Done,
545}
546
547/// The type of the oneshot channel sender for the target future.
548type TargetTransmitterType<'a, M> = (Pin<BoxedFuture<M>>, Option<Context<'a>>);
549
550/// Represents the state used for each iteration through the poller stream.
551struct StreamState<'a, M, Fun>
552where
553    M: StatusMonitor,
554{
555    /// The current polling state (Init, InProgress, or Done)
556    state: State,
557    /// The callback function to make requests
558    make_request: Fun,
559    /// Optional channel sender for the target future
560    target_tx: Option<oneshot::Sender<TargetTransmitterType<'a, M>>>,
561    /// The poller options
562    options: PollerOptions<'a>,
563    /// Whether a span was added to the context
564    added_span: bool,
565}
566
567fn create_poller_stream<
568    M,
569    F: Format,
570    Fun: Fn(PollerState, PollerOptions<'static>) -> PollerResultFuture<M, F> + Send + 'static,
571>(
572    make_request: Fun,
573    options: PollerOptions<'static>,
574) -> (
575    impl Stream<Item = crate::Result<Response<M, F>>> + 'static,
576    BoxedFuture<M>,
577)
578where
579    M: StatusMonitor + 'static,
580    M::Output: Send + 'static,
581    M::Format: Send + 'static,
582{
583    let (target_tx, target_rx) = oneshot::channel();
584
585    assert!(
586        options.frequency >= MIN_RETRY_TIME,
587        "minimum polling frequency is 1 second"
588    );
589    let stream = unfold(
590        // We flow the `make_request` callback through the state value to avoid cloning.
591        StreamState::<M, Fun> {
592            state: State::Init,
593            make_request,
594            target_tx: Some(target_tx),
595            options,
596            added_span: false,
597        },
598        move |mut poller_stream_state| async move {
599            let result = match poller_stream_state.state {
600                State::Init => {
601                    // At the very start of polling, create a span for the entire request, and attach it to the context
602                    let span =
603                        create_public_api_span(&poller_stream_state.options.context, None, None);
604                    if let Some(ref s) = span {
605                        poller_stream_state.added_span = true;
606                        poller_stream_state.options.context =
607                            poller_stream_state.options.context.with_value(s.clone());
608                    }
609                    (poller_stream_state.make_request)(
610                        PollerState::Initial,
611                        poller_stream_state.options.clone(),
612                    )
613                    .await
614                }
615                State::InProgress(continuation) => {
616                    tracing::debug!(
617                        "subsequent operation request to {:?}",
618                        continuation.to_string()
619                    );
620                    (poller_stream_state.make_request)(
621                        PollerState::More(continuation),
622                        poller_stream_state.options.clone(),
623                    )
624                    .await
625                }
626                State::Done => {
627                    tracing::debug!("done");
628                    return None;
629                }
630            };
631            let (item, next_state) = match result {
632                Err(e) => {
633                    if poller_stream_state.added_span {
634                        if let Some(span) =
635                            poller_stream_state.options.context.value::<Arc<dyn Span>>()
636                        {
637                            // Mark the span as an error with an appropriate description.
638                            span.set_status(SpanStatus::Error {
639                                description: e.to_string(),
640                            });
641                            span.set_attribute("error.type", e.kind().to_string().into());
642                            span.end();
643                        }
644                    }
645
646                    poller_stream_state.state = State::Done;
647                    return Some((Err(e), poller_stream_state));
648                }
649                Ok(PollerResult::InProgress {
650                    response,
651                    retry_after,
652                    continuation: n,
653                }) => {
654                    // Note that test-proxy automatically adds a transform that zeroes an existing `after-retry` header during playback, so don't check at runtime:
655                    // <https://github.com/Azure/azure-sdk-tools/blob/a80b559d7682891f36a491b73f52fcb679d40923/tools/test-proxy/Azure.Sdk.Tools.TestProxy/RecordingHandler.cs#L1175>
656                    tracing::trace!("retry poller in {}s", retry_after.whole_seconds());
657                    sleep(retry_after).await;
658
659                    (Ok(response), State::InProgress(n))
660                }
661                // Note that we will normally never reach this state. The normal progression of the `make_request` callback is to return `Succeeded` with a target future,
662                // and then the stream yields the final response and transitions to `Done` state.
663                // The only time that the `make_request` callback will normally enter the `Done` state directly is if the LRO fails or is canceled.
664                Ok(PollerResult::Done { response }) => (Ok(response), State::Done),
665                Ok(PollerResult::Succeeded {
666                    response,
667                    target: get_target,
668                }) => {
669                    // Send the target callback through the channel
670                    if let Some(tx) = poller_stream_state.target_tx.take() {
671                        let _ = tx.send((
672                            get_target(),
673                            if poller_stream_state.added_span {
674                                Some(poller_stream_state.options.context.clone())
675                            } else {
676                                None
677                            },
678                        ));
679                    }
680                    // Also yield the final status response
681                    poller_stream_state.state = State::Done;
682                    return Some((Ok(response), poller_stream_state));
683                }
684            };
685
686            // Update state and return
687            poller_stream_state.state = next_state;
688            Some((item, poller_stream_state))
689        },
690    );
691
692    let target = Box::new(async move {
693        match target_rx.await {
694            Ok(target_state) => {
695                // Await the target future to get the final response from the poller.
696                let res = target_state.0.await;
697                // If we added a span to the target, take the result of the final target future to finalize the span.
698
699                if let Some(ctx) = target_state.1 {
700                    match &res {
701                        Ok(response) => {
702                            // When the result is done, finalize the span. Note that we only do that if we created the span in the first place,
703                            // otherwise it is the responsibility of the caller to end their span.
704                            if let Some(span) = ctx.value::<Arc<dyn Span>>() {
705                                // 5xx status codes SHOULD set status to Error.
706                                // The description should not be set because it can be inferred from "http.response.status_code".
707                                if response.status().is_server_error() {
708                                    span.set_status(SpanStatus::Error {
709                                        description: "".to_string(),
710                                    });
711                                }
712                                if response.status().is_client_error()
713                                    || response.status().is_server_error()
714                                {
715                                    span.set_attribute(
716                                        "error.type",
717                                        response.status().to_string().into(),
718                                    );
719                                }
720
721                                span.end();
722                            }
723                        }
724                        Err(err) => {
725                            if let Some(span) = ctx.value::<Arc<dyn Span>>() {
726                                span.set_status(SpanStatus::Error {
727                                    description: err.to_string(),
728                                });
729                                span.set_attribute("error.type", err.kind().to_string().into());
730                                span.end();
731                            }
732                        }
733                    }
734                }
735                res
736            }
737            Err(err) => Err(crate::Error::with_error(
738                ErrorKind::Other,
739                err,
740                "poller completed without defining a target",
741            )),
742        }
743    });
744
745    (stream, target)
746}
747
748/// Get the retry duration from the operation response or [`PollerOptions`].
749pub fn get_retry_after(
750    headers: &Headers,
751    retry_headers: &[HeaderName],
752    options: &PollerOptions,
753) -> Duration {
754    #[cfg_attr(feature = "test", allow(unused_mut))]
755    let duration =
756        crate::http::policies::get_retry_after(headers, OffsetDateTime::now_utc, retry_headers)
757            .unwrap_or(options.frequency);
758
759    #[cfg(feature = "test")]
760    {
761        use crate::test::RecordingMode;
762
763        // Even though test-proxy will zero an existing `after-retry` (or similar proprietary) header during playback,
764        // we need to override the frequency for services which do not send back supported headers in their response.
765        if matches!(headers.get_optional::<RecordingMode>(), Ok(Some(mode)) if mode == RecordingMode::Playback)
766        {
767            if duration > Duration::ZERO {
768                tracing::debug!(
769                    "overriding {}s poller retry in playback",
770                    duration.whole_seconds()
771                );
772            }
773
774            return Duration::ZERO;
775        }
776    }
777
778    duration
779}
780
781fn check_status_code<T, F: Format>(response: &Response<T, F>) -> crate::Result<()> {
782    let status = response.status();
783    match status {
784        StatusCode::Ok | StatusCode::Accepted | StatusCode::Created | StatusCode::NoContent => {
785            Ok(())
786        }
787        _ => {
788            // Ideally we could take an owned `Response` and move data to avoid cloning the `RawResponse`.
789            let raw_response = Box::new(response.to_raw_response());
790            let error_code = F::deserialize(raw_response.body())
791                .ok()
792                .and_then(|err: ErrorResponse| err.error)
793                .and_then(|details| details.code);
794            Err(ErrorKind::HttpResponse {
795                status,
796                error_code,
797                raw_response: Some(raw_response),
798            }
799            .into_error())
800        }
801    }
802}
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807    #[cfg(feature = "xml")]
808    use crate::http::XmlFormat;
809    use crate::http::{
810        headers::Headers, AsyncRawResponse, HttpClient, Method, NoFormat, RawResponse, Request,
811    };
812    use azure_core_test::http::MockHttpClient;
813    use futures::{FutureExt as _, TryStreamExt as _};
814    use std::sync::{Arc, Mutex};
815
816    #[derive(Debug, serde::Deserialize)]
817    struct TestStatus {
818        status: String,
819        #[serde(default)]
820        target: Option<String>,
821    }
822
823    #[derive(Debug, serde::Deserialize)]
824    struct TestOutput {
825        #[serde(default)]
826        id: Option<String>,
827        #[serde(default)]
828        name: Option<String>,
829    }
830
831    impl StatusMonitor for TestStatus {
832        type Output = TestOutput;
833        type Format = JsonFormat;
834
835        fn status(&self) -> PollerStatus {
836            self.status.parse().unwrap_or_default()
837        }
838    }
839
840    #[cfg(feature = "xml")]
841    #[derive(Debug, serde::Deserialize)]
842    struct XmlTestStatus {
843        status: String,
844    }
845
846    #[cfg(feature = "xml")]
847    impl StatusMonitor for XmlTestStatus {
848        type Output = TestOutput;
849        type Format = XmlFormat;
850
851        fn status(&self) -> PollerStatus {
852            self.status.parse().unwrap_or_default()
853        }
854    }
855
856    #[tokio::test]
857    async fn poller_succeeded() {
858        let call_count = Arc::new(Mutex::new(0));
859
860        let mock_client = {
861            let call_count = call_count.clone();
862            Arc::new(MockHttpClient::new(move |_| {
863                let call_count = call_count.clone();
864                async move {
865                    let mut count = call_count.lock().unwrap();
866                    *count += 1;
867
868                    if *count == 1 {
869                        // First call returns 201 Created with InProgress status
870                        Ok(AsyncRawResponse::from_bytes(
871                            StatusCode::Created,
872                            Headers::new(),
873                            br#"{"status":"InProgress"}"#.to_vec(),
874                        ))
875                    } else {
876                        // Second call returns 200 OK with Succeeded status
877                        Ok(AsyncRawResponse::from_bytes(
878                            StatusCode::Ok,
879                            Headers::new(),
880                            br#"{"status":"Succeeded"}"#.to_vec(),
881                        ))
882                    }
883                }
884                .boxed()
885            }))
886        };
887
888        let mut poller = Poller::new(
889            move |_, _| {
890                let client = mock_client.clone();
891                Box::pin(async move {
892                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
893                    let raw_response = client.execute_request(&req).await?;
894                    let (status, headers, body) = raw_response.deconstruct();
895                    let bytes = body.collect().await?;
896
897                    let test_status: TestStatus = crate::json::from_json(&bytes)?;
898                    let response: Response<TestStatus> =
899                        RawResponse::from_bytes(status, headers, bytes).into();
900
901                    match test_status.status() {
902                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
903                            response,
904                            retry_after: Duration::ZERO,
905                            continuation: PollerContinuation::Links {
906                                next_link: req.url().clone(),
907                                final_link: None,
908                            },
909                        }),
910                        _ => Ok(PollerResult::Done { response }),
911                    }
912                })
913            },
914            None,
915        );
916
917        // First poll should succeed (201 Created with InProgress)
918        let first_result = poller.next().await;
919        assert!(first_result.is_some());
920        let first_response = first_result.unwrap().unwrap();
921        assert_eq!(first_response.status(), StatusCode::Created);
922        let first_body = first_response.into_model().unwrap();
923        assert_eq!(first_body.status(), PollerStatus::InProgress);
924
925        // Second poll should succeed (200 OK with Succeeded)
926        let second_result = poller.next().await;
927        assert!(second_result.is_some());
928        let second_response = second_result.unwrap().unwrap();
929        assert_eq!(second_response.status(), StatusCode::Ok);
930        let second_body = second_response.into_model().unwrap();
931        assert_eq!(second_body.status(), PollerStatus::Succeeded);
932
933        // Third poll should return None (end of stream)
934        let third_result = poller.next().await;
935        assert!(third_result.is_none());
936
937        // Verify both calls were made
938        assert_eq!(*call_count.lock().unwrap(), 2);
939    }
940
941    #[tokio::test]
942    async fn poller_failed() {
943        let call_count = Arc::new(Mutex::new(0));
944
945        let mock_client = {
946            let call_count = call_count.clone();
947            Arc::new(MockHttpClient::new(move |_| {
948                let call_count = call_count.clone();
949                async move {
950                    let mut count = call_count.lock().unwrap();
951                    *count += 1;
952
953                    if *count == 1 {
954                        // First call returns 201 Created with InProgress status
955                        Ok(AsyncRawResponse::from_bytes(
956                            StatusCode::Created,
957                            Headers::new(),
958                            br#"{"status":"InProgress"}"#.to_vec(),
959                        ))
960                    } else {
961                        // Second call returns 200 OK with Failed status
962                        Ok(AsyncRawResponse::from_bytes(
963                            StatusCode::Ok,
964                            Headers::new(),
965                            br#"{"status":"Failed"}"#.to_vec(),
966                        ))
967                    }
968                }
969                .boxed()
970            }))
971        };
972        let mut poller = Poller::new(
973            move |_, _| {
974                let client = mock_client.clone();
975                Box::pin(async move {
976                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
977                    let raw_response = client
978                        .execute_request(&req)
979                        .await?
980                        .try_into_raw_response()
981                        .await?;
982                    let (status, headers, body) = raw_response.deconstruct();
983
984                    let test_status: TestStatus = crate::json::from_json(&body)?;
985                    let response: Response<TestStatus> =
986                        RawResponse::from_bytes(status, headers, body).into();
987
988                    match test_status.status() {
989                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
990                            response,
991                            retry_after: Duration::ZERO,
992                            continuation: PollerContinuation::Links {
993                                next_link: req.url().clone(),
994                                final_link: None,
995                            },
996                        }),
997                        _ => Ok(PollerResult::Done { response }),
998                    }
999                })
1000            },
1001            None,
1002        );
1003
1004        // First poll should succeed (201 Created with InProgress)
1005        let first_result = poller.next().await;
1006        assert!(first_result.is_some());
1007        let first_response = first_result.unwrap().unwrap();
1008        assert_eq!(first_response.status(), StatusCode::Created);
1009        let first_body = first_response.into_model().unwrap();
1010        assert_eq!(first_body.status(), PollerStatus::InProgress);
1011
1012        // Second poll should succeed (200 OK with Succeeded)
1013        let second_result = poller.next().await;
1014        assert!(second_result.is_some());
1015        let second_response = second_result.unwrap().unwrap();
1016        assert_eq!(second_response.status(), StatusCode::Ok);
1017        let second_body = second_response.into_model().unwrap();
1018        assert_eq!(second_body.status(), PollerStatus::Failed);
1019
1020        // Third poll should return None (end of stream)
1021        let third_result = poller.next().await;
1022        assert!(third_result.is_none());
1023
1024        // Verify both calls were made
1025        assert_eq!(*call_count.lock().unwrap(), 2);
1026    }
1027
1028    #[tokio::test]
1029    async fn poller_failed_with_http_429() {
1030        let call_count = Arc::new(Mutex::new(0));
1031
1032        let mock_client = {
1033            let call_count = call_count.clone();
1034            Arc::new(MockHttpClient::new(move |_| {
1035                let call_count = call_count.clone();
1036                async move {
1037                    let mut count = call_count.lock().unwrap();
1038                    *count += 1;
1039
1040                    if *count == 1 {
1041                        // First call returns 200 OK with InProgress status
1042                        Ok(AsyncRawResponse::from_bytes(
1043                            StatusCode::Ok,
1044                            Headers::new(),
1045                            br#"{"status":"InProgress"}"#.to_vec(),
1046                        ))
1047                    } else {
1048                        // Second call returns 429 Too Many Requests
1049                        Ok(AsyncRawResponse::from_bytes(
1050                            StatusCode::TooManyRequests,
1051                            Headers::new(),
1052                            vec![],
1053                        ))
1054                    }
1055                }
1056                .boxed()
1057            }))
1058        };
1059
1060        let mut poller = Poller::new(
1061            move |_, _| {
1062                let client = mock_client.clone();
1063                Box::pin(async move {
1064                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1065                    let raw_response = client
1066                        .execute_request(&req)
1067                        .await?
1068                        .try_into_raw_response()
1069                        .await?;
1070                    let (status, headers, body) = raw_response.deconstruct();
1071
1072                    if status == StatusCode::Ok {
1073                        let test_status: TestStatus = crate::json::from_json(&body)?;
1074                        let response: Response<TestStatus> =
1075                            RawResponse::from_bytes(status, headers, body).into();
1076
1077                        match test_status.status() {
1078                            PollerStatus::InProgress => Ok(PollerResult::InProgress {
1079                                response,
1080                                retry_after: Duration::ZERO,
1081                                continuation: PollerContinuation::Links {
1082                                    next_link: req.url().clone(),
1083                                    final_link: None,
1084                                },
1085                            }),
1086                            _ => Ok(PollerResult::Done { response }),
1087                        }
1088                    } else {
1089                        // Return the error response which should trigger check_status_code
1090                        let response: Response<TestStatus> =
1091                            RawResponse::from_bytes(status, headers, body).into();
1092                        Ok(PollerResult::Done { response })
1093                    }
1094                })
1095            },
1096            None,
1097        );
1098
1099        // First poll should succeed (200 OK with InProgress)
1100        let first_result = poller.next().await;
1101        assert!(first_result.is_some());
1102        assert!(first_result.unwrap().is_ok());
1103
1104        // Second poll should fail due to 429 status code being rejected by check_status_code
1105        let second_result = poller.next().await;
1106        assert!(second_result.is_some());
1107        let error = second_result.unwrap().unwrap_err();
1108
1109        // Verify the error is an HttpResponse error with 429 status
1110        match error.kind() {
1111            ErrorKind::HttpResponse { status, .. } => {
1112                assert_eq!(*status, StatusCode::TooManyRequests);
1113            }
1114            _ => panic!("Expected HttpResponse error, got {:?}", error.kind()),
1115        }
1116
1117        // Verify both calls were made
1118        assert_eq!(*call_count.lock().unwrap(), 2);
1119    }
1120
1121    #[tokio::test]
1122    async fn poller_into_future_succeeds() {
1123        let call_count = Arc::new(Mutex::new(0));
1124
1125        let mock_client = {
1126            let call_count = call_count.clone();
1127            Arc::new(MockHttpClient::new(move |_| {
1128                let call_count = call_count.clone();
1129                async move {
1130                    let mut count = call_count.lock().unwrap();
1131                    *count += 1;
1132
1133                    if *count == 1 {
1134                        // First call returns 201 Created with InProgress status
1135                        Ok(AsyncRawResponse::from_bytes(
1136                            StatusCode::Created,
1137                            Headers::new(),
1138                            br#"{"status":"InProgress"}"#.to_vec(),
1139                        ))
1140                    } else {
1141                        // Second call returns 200 OK with Succeeded status and final result
1142                        Ok(AsyncRawResponse::from_bytes(
1143                            StatusCode::Ok,
1144                            Headers::new(),
1145                            br#"{"status":"Succeeded","id":"op1","name":"Operation completed successfully"}"#.to_vec(),
1146                        ))
1147                    }
1148                }
1149                .boxed()
1150            }))
1151        };
1152
1153        let poller = Poller::new(
1154            move |_, _| {
1155                let client = mock_client.clone();
1156                Box::pin(async move {
1157                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1158                    let raw_response = client.execute_request(&req).await?;
1159                    let (status, headers, body) = raw_response.deconstruct();
1160                    let bytes = body.collect().await?;
1161
1162                    let test_status: TestStatus = crate::json::from_json(&bytes)?;
1163                    let response: Response<TestStatus> =
1164                        RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1165
1166                    match test_status.status() {
1167                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1168                            response,
1169                            retry_after: Duration::ZERO,
1170                            continuation: PollerContinuation::Links {
1171                                next_link: req.url().clone(),
1172                                final_link: None,
1173                            },
1174                        }),
1175                        PollerStatus::Succeeded => {
1176                            // Return the status response with a callback to fetch the final resource
1177                            Ok(PollerResult::Succeeded {
1178                                response,
1179                                target: Box::new(|| {
1180                                    Box::pin(async {
1181                                        // In a real scenario, this would fetch the final resource
1182                                        // For this test, the final status already contains the result
1183                                        use crate::http::headers::Headers;
1184                                        let headers = Headers::new();
1185                                        let bytes = bytes::Bytes::from(
1186                                            r#"{"id": "op1", "name": "Operation completed successfully"}"#,
1187                                        );
1188                                        Ok(RawResponse::from_bytes(StatusCode::Ok, headers, bytes)
1189                                            .into())
1190                                    })
1191                                }),
1192                            })
1193                        }
1194                        _ => Ok(PollerResult::Done { response }),
1195                    }
1196                })
1197            },
1198            None,
1199        );
1200
1201        // Use IntoFuture to await completion
1202        let result = poller.await;
1203        assert!(result.is_ok());
1204        let response = result.unwrap();
1205        assert_eq!(response.status(), StatusCode::Ok);
1206        let output = response.into_model().unwrap();
1207        assert_eq!(output.id.as_deref(), Some("op1"));
1208        assert_eq!(
1209            output.name.as_deref(),
1210            Some("Operation completed successfully")
1211        );
1212
1213        // Verify both calls were made
1214        assert_eq!(*call_count.lock().unwrap(), 2);
1215    }
1216
1217    #[tokio::test]
1218    async fn poller_into_future_with_target_url() {
1219        let call_count = Arc::new(Mutex::new(0));
1220
1221        let mock_client = {
1222            let call_count = call_count.clone();
1223            Arc::new(MockHttpClient::new(move |req: &Request| {
1224                let call_count = call_count.clone();
1225                let url = req.url().to_string();
1226                async move {
1227                    let mut count = call_count.lock().unwrap();
1228                    *count += 1;
1229
1230                    if *count == 1 {
1231                        // First call to operation URL returns InProgress status
1232                        Ok(AsyncRawResponse::from_bytes(
1233                            StatusCode::Accepted,
1234                            Headers::new(),
1235                            br#"{"status":"InProgress"}"#.to_vec(),
1236                        ))
1237                    } else if *count == 2 {
1238                        // Second call to operation URL returns Succeeded with target URL
1239                        Ok(AsyncRawResponse::from_bytes(
1240                            StatusCode::Ok,
1241                            Headers::new(),
1242                            br#"{"status":"Succeeded","target":"https://example.com/resources/123"}"#.to_vec(),
1243                        ))
1244                    } else {
1245                        // Third call fetches the final resource from target URL
1246                        assert_eq!(url, "https://example.com/resources/123");
1247                        Ok(AsyncRawResponse::from_bytes(
1248                            StatusCode::Ok,
1249                            Headers::new(),
1250                            br#"{"id":"123","name":"Test Resource"}"#.to_vec(),
1251                        ))
1252                    }
1253                }
1254                .boxed()
1255            }))
1256        };
1257
1258        let poller = Poller::new(
1259            move |_, _| {
1260                let client = mock_client.clone();
1261                Box::pin(async move {
1262                    let req = Request::new(
1263                        "https://example.com/operations/op1".parse().unwrap(),
1264                        Method::Get,
1265                    );
1266                    let raw_response = client.execute_request(&req).await?;
1267                    let (status, headers, body) = raw_response.deconstruct();
1268                    let bytes = body.collect().await?;
1269
1270                    let operation_status: TestStatus = crate::json::from_json(&bytes)?;
1271                    let response: Response<TestStatus> =
1272                        RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1273
1274                    match operation_status.status() {
1275                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1276                            response,
1277                            retry_after: Duration::ZERO,
1278                            continuation: PollerContinuation::Links {
1279                                next_link: req.url().clone(),
1280                                final_link: None,
1281                            },
1282                        }),
1283                        PollerStatus::Succeeded => {
1284                            // Return the status response with a callback to fetch the final resource
1285                            if let Some(target_url) = operation_status.target {
1286                                let client_clone = client.clone();
1287                                Ok(PollerResult::Succeeded {
1288                                    response,
1289                                    target: Box::new(move || {
1290                                        Box::pin(async move {
1291                                            let target_req = Request::new(
1292                                                target_url.parse().unwrap(),
1293                                                Method::Get,
1294                                            );
1295                                            let target_response =
1296                                                client_clone.execute_request(&target_req).await?;
1297                                            let (target_status, target_headers, target_body) =
1298                                                target_response.deconstruct();
1299                                            let target_bytes = target_body.collect().await?;
1300
1301                                            Ok(RawResponse::from_bytes(
1302                                                target_status,
1303                                                target_headers,
1304                                                target_bytes,
1305                                            )
1306                                            .into())
1307                                        })
1308                                    }),
1309                                })
1310                            } else {
1311                                Err(crate::Error::new(
1312                                    ErrorKind::Other,
1313                                    "no target URL in succeeded response",
1314                                ))
1315                            }
1316                        }
1317                        _ => Ok(PollerResult::Done { response }),
1318                    }
1319                })
1320            },
1321            None,
1322        );
1323
1324        // Use IntoFuture to await completion
1325        let result = poller.await;
1326        assert!(result.is_ok());
1327        let response = result.unwrap();
1328        assert_eq!(response.status(), StatusCode::Ok);
1329        let resource = response.into_model().unwrap();
1330        assert_eq!(resource.id.as_deref(), Some("123"));
1331        assert_eq!(resource.name.as_deref(), Some("Test Resource"));
1332
1333        // Verify all three calls were made
1334        assert_eq!(*call_count.lock().unwrap(), 3);
1335    }
1336
1337    #[tokio::test]
1338    async fn poller_into_future_no_response_body() {
1339        #[derive(Debug, serde::Deserialize)]
1340        struct NoBodyStatus {
1341            status: String,
1342        }
1343
1344        impl StatusMonitor for NoBodyStatus {
1345            type Output = ();
1346            type Format = NoFormat;
1347
1348            fn status(&self) -> PollerStatus {
1349                self.status.parse().unwrap_or_default()
1350            }
1351        }
1352
1353        let call_count = Arc::new(Mutex::new(0));
1354
1355        let mock_client = {
1356            let call_count = call_count.clone();
1357            Arc::new(MockHttpClient::new(move |_| {
1358                let call_count = call_count.clone();
1359                async move {
1360                    let mut count = call_count.lock().unwrap();
1361                    *count += 1;
1362
1363                    if *count == 1 {
1364                        // First call returns 202 Accepted with InProgress status
1365                        Ok(AsyncRawResponse::from_bytes(
1366                            StatusCode::Accepted,
1367                            Headers::new(),
1368                            br#"{"status":"InProgress"}"#.to_vec(),
1369                        ))
1370                    } else {
1371                        // Second call returns 200 OK with Succeeded status
1372                        Ok(AsyncRawResponse::from_bytes(
1373                            StatusCode::Ok,
1374                            Headers::new(),
1375                            br#"{"status":"Succeeded"}"#.to_vec(),
1376                        ))
1377                    }
1378                }
1379                .boxed()
1380            }))
1381        };
1382
1383        let poller = Poller::new(
1384            move |_, _| {
1385                let client = mock_client.clone();
1386                Box::pin(async move {
1387                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1388                    let raw_response = client.execute_request(&req).await?;
1389                    let (status, headers, body) = raw_response.deconstruct();
1390                    let bytes = body.collect().await?;
1391
1392                    let no_body_status: NoBodyStatus = crate::json::from_json(&bytes)?;
1393                    let response: Response<NoBodyStatus> =
1394                        RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1395
1396                    match no_body_status.status() {
1397                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1398                            response,
1399                            retry_after: Duration::ZERO,
1400                            continuation: PollerContinuation::Links {
1401                                next_link: req.url().clone(),
1402                                final_link: None,
1403                            },
1404                        }),
1405                        PollerStatus::Succeeded => {
1406                            // Return the status response with a callback
1407                            Ok(PollerResult::Succeeded {
1408                                response,
1409                                target: Box::new(move || {
1410                                    Box::pin(async move {
1411                                        // Return a Response<()> with no body for operations that don't return data
1412                                        use crate::http::headers::Headers;
1413                                        let headers = Headers::new();
1414                                        Ok(RawResponse::from_bytes(status, headers, Vec::new())
1415                                            .into())
1416                                    })
1417                                }),
1418                            })
1419                        }
1420                        _ => Ok(PollerResult::Done { response }),
1421                    }
1422                })
1423            },
1424            None,
1425        );
1426
1427        // Use IntoFuture to await completion
1428        let result = poller.await;
1429        assert!(result.is_ok());
1430        let response = result.unwrap();
1431        assert_eq!(response.status(), StatusCode::Ok);
1432        // For operations with no response body, we don't need to call into_model()
1433        // The important thing is that the poller completed successfully and returned Response<()>
1434
1435        // Verify both calls were made
1436        assert_eq!(*call_count.lock().unwrap(), 2);
1437    }
1438
1439    #[cfg(feature = "xml")]
1440    #[tokio::test]
1441    async fn poller_succeeded_xml() {
1442        let call_count = Arc::new(Mutex::new(0));
1443
1444        let mock_client = {
1445            let call_count = call_count.clone();
1446            Arc::new(MockHttpClient::new(move |_| {
1447                let call_count = call_count.clone();
1448                async move {
1449                    let mut count = call_count.lock().unwrap();
1450                    *count += 1;
1451
1452                    if *count == 1 {
1453                        // First call returns 201 Created with InProgress status
1454                        Ok(AsyncRawResponse::from_bytes(
1455                            StatusCode::Created,
1456                            Headers::new(),
1457                            b"<XmlTestStatus><status>InProgress</status></XmlTestStatus>".to_vec(),
1458                        ))
1459                    } else {
1460                        // Second call returns 200 OK with Succeeded status
1461                        Ok(AsyncRawResponse::from_bytes(
1462                            StatusCode::Ok,
1463                            Headers::new(),
1464                            b"<XmlTestStatus><status>Succeeded</status></XmlTestStatus>".to_vec(),
1465                        ))
1466                    }
1467                }
1468                .boxed()
1469            }))
1470        };
1471
1472        let mut poller = Poller::new(
1473            move |_, _| {
1474                let client = mock_client.clone();
1475                Box::pin(async move {
1476                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1477                    let raw_response = client.execute_request(&req).await?;
1478                    let (status, headers, body) = raw_response.deconstruct();
1479                    let bytes = body.collect().await?;
1480
1481                    let test_status: XmlTestStatus = crate::xml::from_xml(&bytes)?;
1482                    let response: Response<XmlTestStatus, XmlFormat> =
1483                        RawResponse::from_bytes(status, headers, bytes).into();
1484
1485                    match test_status.status() {
1486                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1487                            response,
1488                            retry_after: Duration::ZERO,
1489                            continuation: PollerContinuation::Links {
1490                                next_link: req.url().clone(),
1491                                final_link: None,
1492                            },
1493                        }),
1494                        _ => Ok(PollerResult::Done { response }),
1495                    }
1496                })
1497            },
1498            None,
1499        );
1500
1501        // First poll should succeed (201 Created with InProgress)
1502        let first_result = poller.next().await;
1503        assert!(first_result.is_some());
1504        let first_response = first_result.unwrap().unwrap();
1505        assert_eq!(first_response.status(), StatusCode::Created);
1506        let first_body = first_response.into_model().unwrap();
1507        assert_eq!(first_body.status(), PollerStatus::InProgress);
1508
1509        // Second poll should succeed (200 OK with Succeeded)
1510        let second_result = poller.next().await;
1511        assert!(second_result.is_some());
1512        let second_response = second_result.unwrap().unwrap();
1513        assert_eq!(second_response.status(), StatusCode::Ok);
1514        let second_body = second_response.into_model().unwrap();
1515        assert_eq!(second_body.status(), PollerStatus::Succeeded);
1516
1517        // Third poll should return None (end of stream)
1518        let third_result = poller.next().await;
1519        assert!(third_result.is_none());
1520
1521        // Verify both calls were made
1522        assert_eq!(*call_count.lock().unwrap(), 2);
1523    }
1524
1525    #[cfg(feature = "xml")]
1526    #[tokio::test]
1527    async fn poller_into_future_succeeds_xml() {
1528        let call_count = Arc::new(Mutex::new(0));
1529
1530        let mock_client = {
1531            let call_count = call_count.clone();
1532            Arc::new(MockHttpClient::new(move |_| {
1533                let call_count = call_count.clone();
1534                async move {
1535                    let mut count = call_count.lock().unwrap();
1536                    *count += 1;
1537
1538                    if *count == 1 {
1539                        // First call returns 201 Created with InProgress status
1540                        Ok(AsyncRawResponse::from_bytes(
1541                            StatusCode::Created,
1542                            Headers::new(),
1543                            b"<XmlTestStatus><status>InProgress</status></XmlTestStatus>"
1544                                .to_vec(),
1545                        ))
1546                    } else {
1547                        // Second call returns 200 OK with Succeeded status and final result
1548                        // Note: The response contains both status and the final output fields
1549                        Ok(AsyncRawResponse::from_bytes(
1550                            StatusCode::Ok,
1551                            Headers::new(),
1552                            b"<XmlTestStatus><status>Succeeded</status><id>op1</id><name>Operation completed successfully</name></XmlTestStatus>"
1553                                .to_vec(),
1554                        ))
1555                    }
1556                }
1557                .boxed()
1558            }))
1559        };
1560
1561        let poller = Poller::new(
1562            move |_, _| {
1563                let client = mock_client.clone();
1564                Box::pin(async move {
1565                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1566                    let raw_response = client.execute_request(&req).await?;
1567                    let (status, headers, body) = raw_response.deconstruct();
1568                    let bytes = body.collect().await?;
1569
1570                    let test_status: XmlTestStatus = crate::xml::from_xml(&bytes)?;
1571                    let response: Response<XmlTestStatus, XmlFormat> =
1572                        RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1573
1574                    match test_status.status() {
1575                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1576                            response,
1577                            retry_after: Duration::ZERO,
1578                            continuation: PollerContinuation::Links {
1579                                next_link: req.url().clone(),
1580                                final_link: None,
1581                            },
1582                        }),
1583                        PollerStatus::Succeeded => {
1584                            // Return the status response with a callback
1585                            Ok(PollerResult::Succeeded {
1586                                response,
1587                                target: Box::new(move || {
1588                                    Box::pin(async move {
1589                                        // For XML format, return the final response
1590                                        let headers = Headers::new();
1591                                        let bytes = bytes::Bytes::from(
1592                                            r#"<TestOutput><id>op1</id><name>Operation completed successfully</name></TestOutput>"#,
1593                                        );
1594                                        Ok(RawResponse::from_bytes(StatusCode::Ok, headers, bytes)
1595                                            .into())
1596                                    })
1597                                }),
1598                            })
1599                        }
1600                        _ => Ok(PollerResult::Done { response }),
1601                    }
1602                })
1603            },
1604            None,
1605        );
1606
1607        // Use IntoFuture to await completion
1608        let result = poller.await;
1609        assert!(result.is_ok());
1610        let response = result.unwrap();
1611        assert_eq!(response.status(), StatusCode::Ok);
1612        let output = response.into_model().unwrap();
1613        assert_eq!(output.id.as_deref(), Some("op1"));
1614        assert_eq!(
1615            output.name.as_deref(),
1616            Some("Operation completed successfully")
1617        );
1618
1619        // Verify both calls were made
1620        assert_eq!(*call_count.lock().unwrap(), 2);
1621    }
1622
1623    #[tokio::test]
1624    async fn poller_into_future_output_is_self() {
1625        // Test case where StatusMonitor::Output is the same type as the status monitor itself
1626        #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1627        struct SelfContainedStatus {
1628            status: String,
1629            id: Option<String>,
1630            result: Option<String>,
1631        }
1632
1633        impl StatusMonitor for SelfContainedStatus {
1634            type Output = Self; // Output is the same type as the monitor
1635            type Format = JsonFormat;
1636
1637            fn status(&self) -> PollerStatus {
1638                self.status.parse().unwrap_or_default()
1639            }
1640        }
1641
1642        let call_count = Arc::new(Mutex::new(0));
1643
1644        let mock_client = {
1645            let call_count = call_count.clone();
1646            Arc::new(MockHttpClient::new(move |_| {
1647                let call_count = call_count.clone();
1648                async move {
1649                    let mut count = call_count.lock().unwrap();
1650                    *count += 1;
1651
1652                    if *count == 1 {
1653                        // First call returns 201 Created with InProgress status
1654                        Ok(AsyncRawResponse::from_bytes(
1655                            StatusCode::Created,
1656                            Headers::new(),
1657                            br#"{"status":"InProgress","id":"op1"}"#.to_vec(),
1658                        ))
1659                    } else {
1660                        // Second call returns 200 OK with Succeeded status and final result in the same object
1661                        Ok(AsyncRawResponse::from_bytes(
1662                            StatusCode::Ok,
1663                            Headers::new(),
1664                            br#"{"status":"Succeeded","id":"op1","result":"Operation completed successfully"}"#.to_vec(),
1665                        ))
1666                    }
1667                }
1668                .boxed()
1669            }))
1670        };
1671
1672        let poller = Poller::new(
1673            move |_, _| {
1674                let client = mock_client.clone();
1675                Box::pin(async move {
1676                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1677                    let raw_response = client.execute_request(&req).await?;
1678                    let (status, headers, body) = raw_response.deconstruct();
1679                    let bytes = body.collect().await?;
1680
1681                    let self_status: SelfContainedStatus = crate::json::from_json(&bytes)?;
1682                    let response: Response<SelfContainedStatus> =
1683                        RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1684
1685                    match self_status.status() {
1686                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1687                            response,
1688                            retry_after: Duration::ZERO,
1689                            continuation: PollerContinuation::Links {
1690                                next_link: req.url().clone(),
1691                                final_link: None,
1692                            },
1693                        }),
1694                        PollerStatus::Succeeded => {
1695                            // The final result is already in the status response itself
1696                            // No separate fetch needed - just return the same response in the callback
1697                            let final_bytes = bytes.clone();
1698                            Ok(PollerResult::Succeeded {
1699                                response,
1700                                target: Box::new(move || {
1701                                    Box::pin(async move {
1702                                        // Return the same data - no additional fetch needed
1703                                        let headers = Headers::new();
1704                                        Ok(RawResponse::from_bytes(
1705                                            StatusCode::Ok,
1706                                            headers,
1707                                            final_bytes,
1708                                        )
1709                                        .into())
1710                                    })
1711                                }),
1712                            })
1713                        }
1714                        _ => Ok(PollerResult::Done { response }),
1715                    }
1716                })
1717            },
1718            None,
1719        );
1720
1721        // Use IntoFuture to await completion
1722        let result = poller.await;
1723        assert!(result.is_ok());
1724        let response = result.unwrap();
1725        assert_eq!(response.status(), StatusCode::Ok);
1726        let output = response.into_model().unwrap();
1727        assert_eq!(output.id.as_deref(), Some("op1"));
1728        assert_eq!(
1729            output.result.as_deref(),
1730            Some("Operation completed successfully")
1731        );
1732
1733        // Verify both calls were made
1734        assert_eq!(*call_count.lock().unwrap(), 2);
1735    }
1736
1737    #[tokio::test]
1738    async fn poller_stream_output_is_self() {
1739        // Test case where StatusMonitor::Output is the same type as the status monitor itself
1740        // Used as a stream to monitor progress
1741        #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1742        struct SelfContainedStatus {
1743            status: String,
1744            id: Option<String>,
1745            result: Option<String>,
1746        }
1747
1748        impl StatusMonitor for SelfContainedStatus {
1749            type Output = Self; // Output is the same type as the monitor
1750            type Format = JsonFormat;
1751
1752            fn status(&self) -> PollerStatus {
1753                self.status.parse().unwrap_or_default()
1754            }
1755        }
1756
1757        let call_count = Arc::new(Mutex::new(0));
1758
1759        let mock_client = {
1760            let call_count = call_count.clone();
1761            Arc::new(MockHttpClient::new(move |_| {
1762                let call_count = call_count.clone();
1763                async move {
1764                    let mut count = call_count.lock().unwrap();
1765                    *count += 1;
1766
1767                    if *count == 1 {
1768                        // First call returns 201 Created with InProgress status
1769                        Ok(AsyncRawResponse::from_bytes(
1770                            StatusCode::Created,
1771                            Headers::new(),
1772                            br#"{"status":"InProgress","id":"op1"}"#.to_vec(),
1773                        ))
1774                    } else {
1775                        // Second call returns 200 OK with Succeeded status and final result in the same object
1776                        Ok(AsyncRawResponse::from_bytes(
1777                            StatusCode::Ok,
1778                            Headers::new(),
1779                            br#"{"status":"Succeeded","id":"op1","result":"Operation completed successfully"}"#.to_vec(),
1780                        ))
1781                    }
1782                }
1783                .boxed()
1784            }))
1785        };
1786
1787        let mut poller = Poller::new(
1788            move |_, _| {
1789                let client = mock_client.clone();
1790                Box::pin(async move {
1791                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1792                    let raw_response = client.execute_request(&req).await?;
1793                    let (status, headers, body) = raw_response.deconstruct();
1794                    let bytes = body.collect().await?;
1795
1796                    let self_status: SelfContainedStatus = crate::json::from_json(&bytes)?;
1797                    let response: Response<SelfContainedStatus> =
1798                        RawResponse::from_bytes(status, headers.clone(), bytes.clone()).into();
1799
1800                    match self_status.status() {
1801                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1802                            response,
1803                            retry_after: Duration::ZERO,
1804                            continuation: PollerContinuation::Links {
1805                                next_link: req.url().clone(),
1806                                final_link: None,
1807                            },
1808                        }),
1809                        PollerStatus::Succeeded => {
1810                            // The final result is already in the status response itself
1811                            let final_bytes = bytes.clone();
1812                            Ok(PollerResult::Succeeded {
1813                                response,
1814                                target: Box::new(move || {
1815                                    Box::pin(async move {
1816                                        use crate::http::headers::Headers;
1817                                        let headers = Headers::new();
1818                                        Ok(RawResponse::from_bytes(
1819                                            StatusCode::Ok,
1820                                            headers,
1821                                            final_bytes,
1822                                        )
1823                                        .into())
1824                                    })
1825                                }),
1826                            })
1827                        }
1828                        _ => Ok(PollerResult::Done { response }),
1829                    }
1830                })
1831            },
1832            None,
1833        );
1834
1835        // Use as a stream to monitor progress
1836        let mut statuses = Vec::new();
1837        while let Some(status_response) = poller.try_next().await.unwrap() {
1838            let status = status_response.into_model().unwrap();
1839            statuses.push(status);
1840        }
1841
1842        // Should have received both InProgress and Succeeded statuses
1843        assert_eq!(statuses.len(), 2);
1844        assert_eq!(statuses[0].status, "InProgress");
1845        assert_eq!(statuses[0].id.as_deref(), Some("op1"));
1846        assert_eq!(statuses[0].result, None);
1847
1848        assert_eq!(statuses[1].status, "Succeeded");
1849        assert_eq!(statuses[1].id.as_deref(), Some("op1"));
1850        assert_eq!(
1851            statuses[1].result.as_deref(),
1852            Some("Operation completed successfully")
1853        );
1854
1855        // Verify both calls were made
1856        assert_eq!(*call_count.lock().unwrap(), 2);
1857    }
1858
1859    #[tokio::test]
1860    async fn into_future_output_model_and_raw_response() {
1861        let call_count = Arc::new(Mutex::new(0));
1862        let final_json = br#"{"id":"res1","name":"My Resource"}"#;
1863
1864        let mock_client = {
1865            let call_count = call_count.clone();
1866            Arc::new(MockHttpClient::new(move |_| {
1867                let call_count = call_count.clone();
1868                async move {
1869                    let mut count = call_count.lock().unwrap();
1870                    *count += 1;
1871
1872                    if *count == 1 {
1873                        Ok(AsyncRawResponse::from_bytes(
1874                            StatusCode::Accepted,
1875                            Headers::new(),
1876                            br#"{"status":"InProgress"}"#.to_vec(),
1877                        ))
1878                    } else {
1879                        Ok(AsyncRawResponse::from_bytes(
1880                            StatusCode::Ok,
1881                            Headers::new(),
1882                            br#"{"status":"Succeeded"}"#.to_vec(),
1883                        ))
1884                    }
1885                }
1886                .boxed()
1887            }))
1888        };
1889
1890        let poller = Poller::new(
1891            move |_, _| {
1892                let client = mock_client.clone();
1893                Box::pin(async move {
1894                    let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
1895                    let raw_response = client.execute_request(&req).await?;
1896                    let (status, headers, body) = raw_response.deconstruct();
1897                    let bytes = body.collect().await?;
1898
1899                    let test_status: TestStatus = crate::json::from_json(&bytes)?;
1900                    let response: Response<TestStatus> =
1901                        RawResponse::from_bytes(status, headers.clone(), bytes).into();
1902
1903                    match test_status.status() {
1904                        PollerStatus::InProgress => Ok(PollerResult::InProgress {
1905                            response,
1906                            retry_after: Duration::ZERO,
1907                            continuation: PollerContinuation::Links {
1908                                next_link: req.url().clone(),
1909                                final_link: None,
1910                            },
1911                        }),
1912                        PollerStatus::Succeeded => {
1913                            let target_body =
1914                                bytes::Bytes::from_static(br#"{"id":"res1","name":"My Resource"}"#);
1915                            Ok(PollerResult::Succeeded {
1916                                response,
1917                                target: Box::new(move || {
1918                                    Box::pin(async move {
1919                                        Ok(RawResponse::from_bytes(
1920                                            StatusCode::Ok,
1921                                            Headers::new(),
1922                                            target_body,
1923                                        )
1924                                        .into())
1925                                    })
1926                                }),
1927                            })
1928                        }
1929                        _ => Ok(PollerResult::Done { response }),
1930                    }
1931                })
1932            },
1933            None,
1934        );
1935
1936        // Await the poller to get Response<TestOutput> (the StatusMonitor::Output type).
1937        let response = poller.await.unwrap();
1938        assert_eq!(response.status(), StatusCode::Ok);
1939
1940        // Clone the RawResponse before consuming via into_model().
1941        let raw = response.to_raw_response();
1942
1943        // Deserialize into the Output model and verify fields.
1944        let output = response.into_model().unwrap();
1945        assert_eq!(output.id.as_deref(), Some("res1"));
1946        assert_eq!(output.name.as_deref(), Some("My Resource"));
1947
1948        // The RawResponse still holds the full JSON body.
1949        let (raw_status, _, raw_body) = raw.deconstruct();
1950        assert_eq!(raw_status, StatusCode::Ok);
1951        assert_eq!(raw_body.as_ref(), final_json);
1952
1953        assert_eq!(*call_count.lock().unwrap(), 2);
1954    }
1955}