use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use reqwest::Method;
use serde_json::Value;
use crate::constants::ENDPOINT_EXPORT_DECISIONS;
use crate::error::{Error, Result};
use crate::http::HttpTransport;
use crate::options::RequestOptions;
use crate::types::{ExportDecisionsParams, ExportTrailer, ExportedDecisionRow};
pub struct Export {
t: Arc<HttpTransport>,
}
fn is_trailer(value: &Value) -> bool {
value.get("trailer").and_then(Value::as_bool) == Some(true)
}
fn parse_row(value: Value) -> Result<ExportedDecisionRow> {
serde_json::from_value(value).map_err(|e| Error::Decode(e.to_string()))
}
impl Export {
pub(crate) fn new(t: Arc<HttpTransport>) -> Self {
Self { t }
}
pub fn decisions(
&self,
params: ExportDecisionsParams,
req: Option<RequestOptions>,
) -> impl Stream<Item = Result<ExportedDecisionRow>> + Send + 'static {
let t = self.t.clone();
async_stream::try_stream! {
let lines = t
.stream_lines(
Method::GET,
ENDPOINT_EXPORT_DECISIONS,
¶ms.query(),
req.as_ref(),
)
.await?;
futures::pin_mut!(lines);
while let Some(line) = lines.next().await {
let value = line?;
if is_trailer(&value) {
continue;
}
yield parse_row(value)?;
}
}
}
pub fn decisions_with_trailer(
&self,
params: ExportDecisionsParams,
req: Option<RequestOptions>,
) -> DecisionExportStream {
let t = self.t.clone();
let trailer: Arc<Mutex<Option<ExportTrailer>>> = Arc::new(Mutex::new(None));
let sink = trailer.clone();
let inner = async_stream::try_stream! {
let lines = t
.stream_lines(
Method::GET,
ENDPOINT_EXPORT_DECISIONS,
¶ms.query(),
req.as_ref(),
)
.await?;
futures::pin_mut!(lines);
while let Some(line) = lines.next().await {
let value = line?;
if is_trailer(&value) {
if let Ok(parsed) = serde_json::from_value::<ExportTrailer>(value) {
if let Ok(mut slot) = sink.lock() {
*slot = Some(parsed);
}
}
continue;
}
yield parse_row(value)?;
}
};
DecisionExportStream {
inner: inner.boxed(),
trailer,
}
}
}
pub struct DecisionExportStream {
inner: BoxStream<'static, Result<ExportedDecisionRow>>,
trailer: Arc<Mutex<Option<ExportTrailer>>>,
}
impl DecisionExportStream {
#[must_use]
pub fn trailer(&self) -> Option<ExportTrailer> {
self.trailer.lock().ok().and_then(|slot| slot.clone())
}
}
impl Stream for DecisionExportStream {
type Item = Result<ExportedDecisionRow>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}