1use crate::{
2 di::{Asyncify, Injectable},
3 from_fn_with_description,
4 handler::core::Handler,
5 HandlerDescription, HandlerSignature,
6};
7
8use std::{collections::BTreeSet, ops::ControlFlow, sync::Arc};
9
10#[must_use]
16#[track_caller]
17pub fn filter<'a, Pred, Output, FnArgs, Descr>(pred: Pred) -> Handler<'a, Output, Descr>
18where
19 Asyncify<Pred>: Injectable<bool, FnArgs> + Send + Sync + 'a,
20 Output: 'a,
21 Descr: HandlerDescription,
22{
23 filter_with_description(Descr::filter(), pred)
24}
25
26#[must_use]
28#[track_caller]
29pub fn filter_async<'a, Pred, Output, FnArgs, Descr>(pred: Pred) -> Handler<'a, Output, Descr>
30where
31 Pred: Injectable<bool, FnArgs> + Send + Sync + 'a,
32 Output: 'a,
33 Descr: HandlerDescription,
34{
35 filter_async_with_description(Descr::filter_async(), pred)
36}
37
38#[must_use]
40#[track_caller]
41pub fn filter_with_description<'a, Pred, Output, FnArgs, Descr>(
42 description: Descr,
43 pred: Pred,
44) -> Handler<'a, Output, Descr>
45where
46 Asyncify<Pred>: Injectable<bool, FnArgs> + Send + Sync + 'a,
47 Output: 'a,
48{
49 filter_async_with_description(description, Asyncify(pred))
50}
51
52#[must_use]
54#[track_caller]
55pub fn filter_async_with_description<'a, Pred, Output, FnArgs, Descr>(
56 description: Descr,
57 pred: Pred,
58) -> Handler<'a, Output, Descr>
59where
60 Pred: Injectable<bool, FnArgs> + Send + Sync + 'a,
61 Output: 'a,
62{
63 let pred = Arc::new(pred);
64
65 from_fn_with_description(
66 description,
67 move |event, cont| {
68 let pred = Arc::clone(&pred);
69
70 async move {
71 let pred = pred.inject(&event);
72 let cond = pred().await;
73 drop(pred);
74
75 if cond {
76 cont(event).await
77 } else {
78 ControlFlow::Continue(event)
79 }
80 }
81 },
82 HandlerSignature::Other {
83 obligations: Pred::obligations(),
84 guaranteed_outcomes: BTreeSet::default(),
85 conditional_outcomes: BTreeSet::default(),
86 },
87 )
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::{deps, help_inference};
94
95 #[tokio::test]
96 async fn test_filter() {
97 let input_value = 123;
98 let input = deps![input_value];
99 let output = 7;
100
101 let result = help_inference(filter_async(move |event: i32| async move {
102 assert_eq!(event, input_value);
103 true
104 }))
105 .endpoint(move |event: i32| async move {
106 assert_eq!(event, input_value);
107 output
108 })
109 .dispatch(input)
110 .await;
111
112 assert!(result == ControlFlow::Break(output));
113 }
114
115 #[tokio::test]
116 async fn test_and_then_filter() {
117 let input = 123;
118 let output = 7;
119
120 let result = help_inference(filter(move |event: i32| {
121 assert_eq!(event, input);
122 true
123 }))
124 .chain(
125 filter_async(move |event: i32| async move {
126 assert_eq!(event, input);
127 true
128 })
129 .endpoint(move |event: i32| async move {
130 assert_eq!(event, input);
131 output
132 }),
133 )
134 .dispatch(deps![input])
135 .await;
136
137 assert!(result == ControlFlow::Break(output));
138 }
139}