dptree/handler/
filter.rs

1use crate::{
2    di::{Asyncify, Injectable},
3    from_fn_with_description,
4    handler::core::Handler,
5    HandlerDescription, HandlerSignature,
6};
7
8use std::{collections::BTreeSet, ops::ControlFlow, sync::Arc};
9
10/// Constructs a handler that filters input with the predicate `pred`.
11///
12/// `pred` has an access to all values that are stored in the input container.
13/// If it returns `true`, a continuation of the handler will be called,
14/// otherwise the handler returns [`ControlFlow::Continue`].
15#[must_use]
16#[track_caller]
17pub fn filter<'a, Pred, Output, FnArgs, Descr>(pred: Pred) -> Handler<'a, Output, Descr>
18where
19    Asyncify<Pred>: Injectable<bool, FnArgs> + Send + Sync + 'a,
20    Output: 'a,
21    Descr: HandlerDescription,
22{
23    filter_with_description(Descr::filter(), pred)
24}
25
26/// The asynchronous version of [`filter`].
27#[must_use]
28#[track_caller]
29pub fn filter_async<'a, Pred, Output, FnArgs, Descr>(pred: Pred) -> Handler<'a, Output, Descr>
30where
31    Pred: Injectable<bool, FnArgs> + Send + Sync + 'a,
32    Output: 'a,
33    Descr: HandlerDescription,
34{
35    filter_async_with_description(Descr::filter_async(), pred)
36}
37
38/// [`filter`] with a custom description.
39#[must_use]
40#[track_caller]
41pub fn filter_with_description<'a, Pred, Output, FnArgs, Descr>(
42    description: Descr,
43    pred: Pred,
44) -> Handler<'a, Output, Descr>
45where
46    Asyncify<Pred>: Injectable<bool, FnArgs> + Send + Sync + 'a,
47    Output: 'a,
48{
49    filter_async_with_description(description, Asyncify(pred))
50}
51
52/// [`filter_async`] with a custom description.
53#[must_use]
54#[track_caller]
55pub fn filter_async_with_description<'a, Pred, Output, FnArgs, Descr>(
56    description: Descr,
57    pred: Pred,
58) -> Handler<'a, Output, Descr>
59where
60    Pred: Injectable<bool, FnArgs> + Send + Sync + 'a,
61    Output: 'a,
62{
63    let pred = Arc::new(pred);
64
65    from_fn_with_description(
66        description,
67        move |event, cont| {
68            let pred = Arc::clone(&pred);
69
70            async move {
71                let pred = pred.inject(&event);
72                let cond = pred().await;
73                drop(pred);
74
75                if cond {
76                    cont(event).await
77                } else {
78                    ControlFlow::Continue(event)
79                }
80            }
81        },
82        HandlerSignature::Other {
83            obligations: Pred::obligations(),
84            guaranteed_outcomes: BTreeSet::default(),
85            conditional_outcomes: BTreeSet::default(),
86        },
87    )
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::{deps, help_inference};
94
95    #[tokio::test]
96    async fn test_filter() {
97        let input_value = 123;
98        let input = deps![input_value];
99        let output = 7;
100
101        let result = help_inference(filter_async(move |event: i32| async move {
102            assert_eq!(event, input_value);
103            true
104        }))
105        .endpoint(move |event: i32| async move {
106            assert_eq!(event, input_value);
107            output
108        })
109        .dispatch(input)
110        .await;
111
112        assert!(result == ControlFlow::Break(output));
113    }
114
115    #[tokio::test]
116    async fn test_and_then_filter() {
117        let input = 123;
118        let output = 7;
119
120        let result = help_inference(filter(move |event: i32| {
121            assert_eq!(event, input);
122            true
123        }))
124        .chain(
125            filter_async(move |event: i32| async move {
126                assert_eq!(event, input);
127                true
128            })
129            .endpoint(move |event: i32| async move {
130                assert_eq!(event, input);
131                output
132            }),
133        )
134        .dispatch(deps![input])
135        .await;
136
137        assert!(result == ControlFlow::Break(output));
138    }
139}