use std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use futures::{ready, FutureExt, Stream, StreamExt};
use tonic::{metadata::MetadataMap, Status, Streaming};
pub fn extract_lazy_trailers<T>(s: Streaming<T>) -> (ExtractTrailersStream<T>, LazyTrailers) {
let trailers: SharedTrailers = Default::default();
let stream = ExtractTrailersStream {
inner: s,
trailers: Arc::clone(&trailers),
};
let lazy_trailers = LazyTrailers { trailers };
(stream, lazy_trailers)
}
type SharedTrailers = Arc<Mutex<Option<MetadataMap>>>;
#[derive(Debug)]
pub struct ExtractTrailersStream<T> {
inner: Streaming<T>,
trailers: SharedTrailers,
}
impl<T> Stream for ExtractTrailersStream<T> {
type Item = Result<T, Status>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let res = ready!(self.inner.poll_next_unpin(cx));
if res.is_none() {
if let Some(trailers) = self
.inner
.trailers()
.now_or_never()
.and_then(|res| res.ok())
.flatten()
{
*self.trailers.lock().expect("poisoned") = Some(trailers);
}
}
Poll::Ready(res)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
#[derive(Debug)]
pub struct LazyTrailers {
trailers: SharedTrailers,
}
impl LazyTrailers {
pub fn get(&self) -> Option<MetadataMap> {
self.trailers.lock().expect("poisoned").clone()
}
}