Skip to main content

floopy/resources/
export.rs

1use std::pin::Pin;
2use std::sync::{Arc, Mutex};
3use std::task::{Context, Poll};
4
5use futures::stream::BoxStream;
6use futures::{Stream, StreamExt};
7use reqwest::Method;
8use serde_json::Value;
9
10use crate::constants::ENDPOINT_EXPORT_DECISIONS;
11use crate::error::{Error, Result};
12use crate::http::HttpTransport;
13use crate::options::RequestOptions;
14use crate::types::{ExportDecisionsParams, ExportTrailer, ExportedDecisionRow};
15
16/// Streams the decision log.
17pub struct Export {
18    t: Arc<HttpTransport>,
19}
20
21fn is_trailer(value: &Value) -> bool {
22    value.get("trailer").and_then(Value::as_bool) == Some(true)
23}
24
25fn parse_row(value: Value) -> Result<ExportedDecisionRow> {
26    serde_json::from_value(value).map_err(|e| Error::Decode(e.to_string()))
27}
28
29impl Export {
30    pub(crate) fn new(t: Arc<HttpTransport>) -> Self {
31        Self { t }
32    }
33
34    /// Stream decision rows from the JSONL export. The terminal trailer
35    /// record is skipped — use [`Export::decisions_with_trailer`] to
36    /// capture it. The request is issued on first poll.
37    pub fn decisions(
38        &self,
39        params: ExportDecisionsParams,
40        req: Option<RequestOptions>,
41    ) -> impl Stream<Item = Result<ExportedDecisionRow>> + Send + 'static {
42        let t = self.t.clone();
43        async_stream::try_stream! {
44            let lines = t
45                .stream_lines(
46                    Method::GET,
47                    ENDPOINT_EXPORT_DECISIONS,
48                    &params.query(),
49                    req.as_ref(),
50                )
51                .await?;
52            futures::pin_mut!(lines);
53            while let Some(line) = lines.next().await {
54                let value = line?;
55                if is_trailer(&value) {
56                    continue;
57                }
58                yield parse_row(value)?;
59            }
60        }
61    }
62
63    /// Stream decision rows and capture the trailer. Drive the returned
64    /// [`DecisionExportStream`] to completion, then call
65    /// [`DecisionExportStream::trailer`] for the summary.
66    pub fn decisions_with_trailer(
67        &self,
68        params: ExportDecisionsParams,
69        req: Option<RequestOptions>,
70    ) -> DecisionExportStream {
71        let t = self.t.clone();
72        let trailer: Arc<Mutex<Option<ExportTrailer>>> = Arc::new(Mutex::new(None));
73        let sink = trailer.clone();
74        let inner = async_stream::try_stream! {
75            let lines = t
76                .stream_lines(
77                    Method::GET,
78                    ENDPOINT_EXPORT_DECISIONS,
79                    &params.query(),
80                    req.as_ref(),
81                )
82                .await?;
83            futures::pin_mut!(lines);
84            while let Some(line) = lines.next().await {
85                let value = line?;
86                if is_trailer(&value) {
87                    if let Ok(parsed) = serde_json::from_value::<ExportTrailer>(value) {
88                        if let Ok(mut slot) = sink.lock() {
89                            *slot = Some(parsed);
90                        }
91                    }
92                    continue;
93                }
94                yield parse_row(value)?;
95            }
96        };
97        DecisionExportStream {
98            inner: inner.boxed(),
99            trailer,
100        }
101    }
102}
103
104/// An iterable export that also captures the trailer record.
105///
106/// Implements [`Stream`] over [`ExportedDecisionRow`]. After the stream is
107/// fully consumed, [`DecisionExportStream::trailer`] returns the summary
108/// (truncation reason / totals), or `None` if the gateway sent no trailer.
109pub struct DecisionExportStream {
110    inner: BoxStream<'static, Result<ExportedDecisionRow>>,
111    trailer: Arc<Mutex<Option<ExportTrailer>>>,
112}
113
114impl DecisionExportStream {
115    /// The export trailer, or `None` if not yet seen / not sent.
116    #[must_use]
117    pub fn trailer(&self) -> Option<ExportTrailer> {
118        self.trailer.lock().ok().and_then(|slot| slot.clone())
119    }
120}
121
122impl Stream for DecisionExportStream {
123    type Item = Result<ExportedDecisionRow>;
124
125    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126        self.inner.poll_next_unpin(cx)
127    }
128}