floopy/resources/
export.rs1use std::pin::Pin;
2use std::sync::{Arc, Mutex};
3use std::task::{Context, Poll};
4
5use futures::stream::BoxStream;
6use futures::{Stream, StreamExt};
7use reqwest::Method;
8use serde_json::Value;
9
10use crate::constants::ENDPOINT_EXPORT_DECISIONS;
11use crate::error::{Error, Result};
12use crate::http::HttpTransport;
13use crate::options::RequestOptions;
14use crate::types::{ExportDecisionsParams, ExportTrailer, ExportedDecisionRow};
15
16pub struct Export {
18 t: Arc<HttpTransport>,
19}
20
21fn is_trailer(value: &Value) -> bool {
22 value.get("trailer").and_then(Value::as_bool) == Some(true)
23}
24
25fn parse_row(value: Value) -> Result<ExportedDecisionRow> {
26 serde_json::from_value(value).map_err(|e| Error::Decode(e.to_string()))
27}
28
29impl Export {
30 pub(crate) fn new(t: Arc<HttpTransport>) -> Self {
31 Self { t }
32 }
33
34 pub fn decisions(
38 &self,
39 params: ExportDecisionsParams,
40 req: Option<RequestOptions>,
41 ) -> impl Stream<Item = Result<ExportedDecisionRow>> + Send + 'static {
42 let t = self.t.clone();
43 async_stream::try_stream! {
44 let lines = t
45 .stream_lines(
46 Method::GET,
47 ENDPOINT_EXPORT_DECISIONS,
48 ¶ms.query(),
49 req.as_ref(),
50 )
51 .await?;
52 futures::pin_mut!(lines);
53 while let Some(line) = lines.next().await {
54 let value = line?;
55 if is_trailer(&value) {
56 continue;
57 }
58 yield parse_row(value)?;
59 }
60 }
61 }
62
63 pub fn decisions_with_trailer(
67 &self,
68 params: ExportDecisionsParams,
69 req: Option<RequestOptions>,
70 ) -> DecisionExportStream {
71 let t = self.t.clone();
72 let trailer: Arc<Mutex<Option<ExportTrailer>>> = Arc::new(Mutex::new(None));
73 let sink = trailer.clone();
74 let inner = async_stream::try_stream! {
75 let lines = t
76 .stream_lines(
77 Method::GET,
78 ENDPOINT_EXPORT_DECISIONS,
79 ¶ms.query(),
80 req.as_ref(),
81 )
82 .await?;
83 futures::pin_mut!(lines);
84 while let Some(line) = lines.next().await {
85 let value = line?;
86 if is_trailer(&value) {
87 if let Ok(parsed) = serde_json::from_value::<ExportTrailer>(value) {
88 if let Ok(mut slot) = sink.lock() {
89 *slot = Some(parsed);
90 }
91 }
92 continue;
93 }
94 yield parse_row(value)?;
95 }
96 };
97 DecisionExportStream {
98 inner: inner.boxed(),
99 trailer,
100 }
101 }
102}
103
104pub struct DecisionExportStream {
110 inner: BoxStream<'static, Result<ExportedDecisionRow>>,
111 trailer: Arc<Mutex<Option<ExportTrailer>>>,
112}
113
114impl DecisionExportStream {
115 #[must_use]
117 pub fn trailer(&self) -> Option<ExportTrailer> {
118 self.trailer.lock().ok().and_then(|slot| slot.clone())
119 }
120}
121
122impl Stream for DecisionExportStream {
123 type Item = Result<ExportedDecisionRow>;
124
125 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 self.inner.poll_next_unpin(cx)
127 }
128}