amadeus_core/par_stream/
inspect.rs

1use derive_new::new;
2use futures::Stream;
3use pin_project::pin_project;
4use serde::{Deserialize, Serialize};
5use serde_closure::traits::FnMut;
6use std::{
7	pin::Pin, task::{Context, Poll}
8};
9
10use super::{ParallelPipe, ParallelStream, PipeTask, StreamTask};
11use crate::pipe::Pipe;
12
13#[pin_project]
14#[derive(new)]
15#[must_use]
16pub struct Inspect<P, F> {
17	#[pin]
18	pipe: P,
19	f: F,
20}
21
22impl_par_dist! {
23	impl<P: ParallelStream, F> ParallelStream for Inspect<P, F>
24	where
25		F: for<'a> FnMut<(&'a P::Item,), Output = ()> + Clone + Send + 'static,
26	{
27		type Item = P::Item;
28		type Task = InspectTask<P::Task, F>;
29
30		fn size_hint(&self) -> (usize, Option<usize>) {
31			self.pipe.size_hint()
32		}
33		fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
34			let self_ = self.project();
35			let f = self_.f;
36			self_.pipe.next_task(cx).map(|task| {
37				task.map(|task| {
38					let f = f.clone();
39					InspectTask { task, f }
40				})
41			})
42		}
43	}
44
45	impl<P: ParallelPipe<Input>, F, Input> ParallelPipe<Input> for Inspect<P, F>
46	where
47		F: for<'a> FnMut<(&'a P::Output,), Output = ()> + Clone + Send + 'static,
48	{
49		type Output = P::Output;
50		type Task = InspectTask<P::Task, F>;
51
52		fn task(&self) -> Self::Task {
53			let task = self.pipe.task();
54			let f = self.f.clone();
55			InspectTask { task, f }
56		}
57	}
58}
59
60#[pin_project]
61#[derive(Serialize, Deserialize)]
62pub struct InspectTask<T, F> {
63	#[pin]
64	task: T,
65	f: F,
66}
67
68impl<C: StreamTask, F> StreamTask for InspectTask<C, F>
69where
70	F: for<'a> FnMut<(&'a C::Item,), Output = ()> + Clone,
71{
72	type Item = C::Item;
73	type Async = InspectTask<C::Async, F>;
74
75	fn into_async(self) -> Self::Async {
76		InspectTask {
77			task: self.task.into_async(),
78			f: self.f,
79		}
80	}
81}
82impl<C: PipeTask<Input>, F, Input> PipeTask<Input> for InspectTask<C, F>
83where
84	F: for<'a> FnMut<(&'a C::Output,), Output = ()> + Clone,
85{
86	type Output = C::Output;
87	type Async = InspectTask<C::Async, F>;
88
89	fn into_async(self) -> Self::Async {
90		InspectTask {
91			task: self.task.into_async(),
92			f: self.f,
93		}
94	}
95}
96
97impl<C: Stream, F> Stream for InspectTask<C, F>
98where
99	F: for<'a> FnMut<(&'a C::Item,), Output = ()> + Clone,
100{
101	type Item = C::Item;
102
103	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
104		let mut self_ = self.project();
105		let (task, f) = (self_.task, &mut self_.f);
106		task.poll_next(cx).map(|item| {
107			item.map(|item| {
108				f.call_mut((&item,));
109				item
110			})
111		})
112	}
113}
114
115impl<C: Pipe<Input>, F, Input> Pipe<Input> for InspectTask<C, F>
116where
117	F: for<'a> FnMut<(&'a C::Output,), Output = ()> + Clone,
118{
119	type Output = C::Output;
120
121	fn poll_next(
122		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
123	) -> Poll<Option<Self::Output>> {
124		let mut self_ = self.project();
125		let (task, f) = (self_.task, &mut self_.f);
126		task.poll_next(cx, stream).map(|item| {
127			item.map(|item| {
128				f.call_mut((&item,));
129				item
130			})
131		})
132	}
133}