Skip to main content

floopy/resources/
decisions.rs

1use std::sync::Arc;
2
3use futures::Stream;
4use reqwest::Method;
5
6use crate::constants::{decision_by_id, ENDPOINT_DECISIONS};
7use crate::error::Result;
8use crate::http::HttpTransport;
9use crate::options::RequestOptions;
10use crate::types::{Decision, DecisionListPage, DecisionListParams};
11
12use super::require;
13
14/// Reads the decision audit trail.
15pub struct Decisions {
16    t: Arc<HttpTransport>,
17}
18
19impl Decisions {
20    pub(crate) fn new(t: Arc<HttpTransport>) -> Self {
21        Self { t }
22    }
23
24    /// Fetch one decision by request id.
25    ///
26    /// # Errors
27    /// Returns an [`Error`](crate::Error) on a non-2xx response or transport
28    /// failure.
29    pub async fn get(
30        &self,
31        request_id: &str,
32        req: impl Into<Option<RequestOptions>>,
33    ) -> Result<Decision> {
34        let (data, _) = self
35            .t
36            .request(
37                Method::GET,
38                &decision_by_id(request_id),
39                None,
40                &[],
41                req.into().as_ref(),
42            )
43            .await?;
44        require(data)
45    }
46
47    /// Fetch a single page of decisions.
48    ///
49    /// # Errors
50    /// Returns an [`Error`](crate::Error) on a non-2xx response or transport
51    /// failure.
52    pub async fn list(
53        &self,
54        params: &DecisionListParams,
55        req: impl Into<Option<RequestOptions>>,
56    ) -> Result<DecisionListPage> {
57        let (data, _) = self
58            .t
59            .request(
60                Method::GET,
61                ENDPOINT_DECISIONS,
62                None,
63                &params.query(),
64                req.into().as_ref(),
65            )
66            .await?;
67        require(data)
68    }
69
70    /// Stream one [`DecisionListPage`] per network round-trip until the
71    /// gateway reports no more pages.
72    pub fn pages(
73        &self,
74        params: DecisionListParams,
75        req: Option<RequestOptions>,
76    ) -> impl Stream<Item = Result<DecisionListPage>> + Send + 'static {
77        let t = self.t.clone();
78        async_stream::try_stream! {
79            let mut params = params;
80            loop {
81                let (data, _) = t
82                    .request::<DecisionListPage>(
83                        Method::GET,
84                        ENDPOINT_DECISIONS,
85                        None,
86                        &params.query(),
87                        req.as_ref(),
88                    )
89                    .await?;
90                let page = require(data)?;
91                let next = page.next_cursor.clone();
92                let has_more = page.has_more;
93                yield page;
94                match next {
95                    Some(cursor) if has_more && !cursor.is_empty() => {
96                        params.cursor = Some(cursor);
97                    }
98                    _ => break,
99                }
100            }
101        }
102    }
103
104    /// Stream every decision across all pages.
105    pub fn iter(
106        &self,
107        params: DecisionListParams,
108        req: Option<RequestOptions>,
109    ) -> impl Stream<Item = Result<Decision>> + Send + 'static {
110        let pages = self.pages(params, req);
111        async_stream::try_stream! {
112            futures::pin_mut!(pages);
113            while let Some(page) = futures::StreamExt::next(&mut pages).await {
114                let page = page?;
115                for decision in page.items {
116                    yield decision;
117                }
118            }
119        }
120    }
121}