amadeus_core/par_stream/
inspect.rs1use derive_new::new;
2use futures::Stream;
3use pin_project::pin_project;
4use serde::{Deserialize, Serialize};
5use serde_closure::traits::FnMut;
6use std::{
7 pin::Pin, task::{Context, Poll}
8};
9
10use super::{ParallelPipe, ParallelStream, PipeTask, StreamTask};
11use crate::pipe::Pipe;
12
13#[pin_project]
14#[derive(new)]
15#[must_use]
16pub struct Inspect<P, F> {
17 #[pin]
18 pipe: P,
19 f: F,
20}
21
22impl_par_dist! {
23 impl<P: ParallelStream, F> ParallelStream for Inspect<P, F>
24 where
25 F: for<'a> FnMut<(&'a P::Item,), Output = ()> + Clone + Send + 'static,
26 {
27 type Item = P::Item;
28 type Task = InspectTask<P::Task, F>;
29
30 fn size_hint(&self) -> (usize, Option<usize>) {
31 self.pipe.size_hint()
32 }
33 fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
34 let self_ = self.project();
35 let f = self_.f;
36 self_.pipe.next_task(cx).map(|task| {
37 task.map(|task| {
38 let f = f.clone();
39 InspectTask { task, f }
40 })
41 })
42 }
43 }
44
45 impl<P: ParallelPipe<Input>, F, Input> ParallelPipe<Input> for Inspect<P, F>
46 where
47 F: for<'a> FnMut<(&'a P::Output,), Output = ()> + Clone + Send + 'static,
48 {
49 type Output = P::Output;
50 type Task = InspectTask<P::Task, F>;
51
52 fn task(&self) -> Self::Task {
53 let task = self.pipe.task();
54 let f = self.f.clone();
55 InspectTask { task, f }
56 }
57 }
58}
59
60#[pin_project]
61#[derive(Serialize, Deserialize)]
62pub struct InspectTask<T, F> {
63 #[pin]
64 task: T,
65 f: F,
66}
67
68impl<C: StreamTask, F> StreamTask for InspectTask<C, F>
69where
70 F: for<'a> FnMut<(&'a C::Item,), Output = ()> + Clone,
71{
72 type Item = C::Item;
73 type Async = InspectTask<C::Async, F>;
74
75 fn into_async(self) -> Self::Async {
76 InspectTask {
77 task: self.task.into_async(),
78 f: self.f,
79 }
80 }
81}
82impl<C: PipeTask<Input>, F, Input> PipeTask<Input> for InspectTask<C, F>
83where
84 F: for<'a> FnMut<(&'a C::Output,), Output = ()> + Clone,
85{
86 type Output = C::Output;
87 type Async = InspectTask<C::Async, F>;
88
89 fn into_async(self) -> Self::Async {
90 InspectTask {
91 task: self.task.into_async(),
92 f: self.f,
93 }
94 }
95}
96
97impl<C: Stream, F> Stream for InspectTask<C, F>
98where
99 F: for<'a> FnMut<(&'a C::Item,), Output = ()> + Clone,
100{
101 type Item = C::Item;
102
103 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
104 let mut self_ = self.project();
105 let (task, f) = (self_.task, &mut self_.f);
106 task.poll_next(cx).map(|item| {
107 item.map(|item| {
108 f.call_mut((&item,));
109 item
110 })
111 })
112 }
113}
114
115impl<C: Pipe<Input>, F, Input> Pipe<Input> for InspectTask<C, F>
116where
117 F: for<'a> FnMut<(&'a C::Output,), Output = ()> + Clone,
118{
119 type Output = C::Output;
120
121 fn poll_next(
122 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
123 ) -> Poll<Option<Self::Output>> {
124 let mut self_ = self.project();
125 let (task, f) = (self_.task, &mut self_.f);
126 task.poll_next(cx, stream).map(|item| {
127 item.map(|item| {
128 f.call_mut((&item,));
129 item
130 })
131 })
132 }
133}