rnotifylib/
message_router.rs

1use crate::config::Config;
2use crate::destination::routed_destination::{MessageRoutingBehaviour, RoutedDestination};
3use crate::destination::message_condition::MessageCondition;
4use crate::message::Message;
5use crate::send_error::{SendError, SendErrors};
6use serde::{Serialize, Deserialize};
7use crate::send_error::borrowed::SendErrorBorrowed;
8use crate::send_error::owned::SendErrorOwned;
9use crate::send_error::reported::{ErrorReportSummary, ReportedSendError};
10
11pub struct MessageRouter {
12    destinations: Vec<Box<dyn RoutedDestination>>,
13}
14
15impl MessageRouter {
16    pub fn empty() -> Self {
17        Self {
18            destinations: vec![],
19        }
20    }
21
22    pub fn from_config(config: Config) -> Self {
23        let destinations = config.take_destinations().into_iter()
24            .map(|item| {
25                let boxed: Box<dyn RoutedDestination> = Box::new(item);
26                boxed
27            })
28            .collect();
29
30        Self {
31            destinations,
32        }
33    }
34
35    pub fn add_destination(&mut self, destination: Box<dyn RoutedDestination>) {
36        self.destinations.push(destination)
37    }
38
39    pub fn route<'a>(&self, message: &'a Message) -> Result<usize, SendErrors<'a>> {
40        let mut errors: Vec<SendErrorBorrowed<'a>> = vec![];
41
42        let mut sent_to_non_root_dest = false;
43
44        let mut successful = 0;
45
46        fn send_wrap<'a>(destination: &dyn RoutedDestination, message: &'a Message) -> Result<(), SendErrorBorrowed<'a>> {
47            destination.send(message).map_err(|err| {
48                SendErrorBorrowed::create(err, destination.get_id().to_owned(), message)
49            })
50        }
51
52        fn send_to_dest<'a>(dest: &dyn RoutedDestination,
53                            sent_to_non_root_dest: &mut bool,
54                            successful: &mut usize,
55                            errors: &mut Vec<SendErrorBorrowed<'a>>,
56                            message: &'a Message) {
57            match send_wrap(dest, message) {
58                Ok(()) => {
59                    if !dest.is_root() {
60                        *sent_to_non_root_dest = true;
61                    }
62                    *successful += 1;
63                }
64                Err(send_err) => errors.push(send_err),
65            }
66        }
67
68        for dest in self.destinations.iter()
69            .filter(|dest| dest.get_routing_type().always_send_messages())
70            .filter(|dest| dest.should_receive(message)) {
71
72            send_to_dest(dest.as_ref(), &mut sent_to_non_root_dest, &mut successful, &mut errors, message);
73        }
74
75        if !sent_to_non_root_dest {
76            // Find a drain.
77            for dest in self.destinations.iter()
78                .filter(|dest| dest.get_routing_type() == &MessageRoutingBehaviour::Drain)
79                .filter(|dest| dest.should_receive(message)) {
80
81                send_to_dest(dest.as_ref(), &mut sent_to_non_root_dest, &mut successful, &mut errors, message);
82            }
83        }
84
85        if errors.is_empty() {
86            return Ok(successful);
87        }
88
89        let root_destinations: Vec<_> = self.destinations.iter()
90            .filter(|dest| dest.is_root())
91            .map(|dest| dest.to_owned())
92            .collect();
93
94        let reported_errors = errors.into_iter().map(|error| {
95            let report_message = error.create_report_message();
96            let mut any_report_success = false;
97            let mut report_fails = vec![];
98
99            for root_dest in &root_destinations {
100                match root_dest.send(&report_message) {
101                    Ok(_) => {
102                        any_report_success = true;
103                    }
104                    Err(send_err) => {
105                        let send_err = SendErrorOwned::create(send_err, root_dest.get_id().to_owned(), report_message.clone());
106                        report_fails.push(send_err);
107                    }
108                }
109            }
110
111            ReportedSendError::new(error, ErrorReportSummary::new(any_report_success, report_fails))
112        }).collect();
113
114
115        Err(SendErrors::new(message, reported_errors, successful))
116    }
117
118}
119
120impl Default for MessageRouter {
121    fn default() -> Self {
122        Self::empty()
123    }
124}
125
126/// Controls whether a Message should be sent to a destination.
127///
128/// It does this through two "filters"
129/// - [`MessageRoutingBehaviour`]
130/// - A list of [`MessageCondition`]s
131///
132/// # [`MessageRoutingBehaviour`] #
133/// This filter is interdependent with the where the message has already be sent.
134/// See the documentation for it for more details.
135///
136/// # [`MessageCondition`]s #
137/// If none specified, all messages are allowed.
138/// Otherwise it acts like a whitelist.
139#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
140pub struct RoutingInfo {
141    // Whether errors with sending notifications will be reported to this destination.
142    #[serde(default)]
143    routing_type: MessageRoutingBehaviour,
144    #[serde(default, skip_serializing_if = "Vec::is_empty")]
145    whitelist: Vec<MessageCondition>,
146}
147
148impl RoutingInfo {
149    pub fn of(routing_type: MessageRoutingBehaviour) -> Self {
150        Self {
151            routing_type,
152            whitelist: vec![]
153        }
154    }
155
156    pub fn root() -> Self {
157        Self::of(MessageRoutingBehaviour::Root)
158    }
159
160    pub fn get_routing_behaviour(&self) -> &MessageRoutingBehaviour {
161        &self.routing_type
162    }
163
164    pub fn applies_to(&self, message: &Message) -> bool {
165        if self.whitelist.is_empty() {
166            return true;
167        }
168        self.whitelist.iter()
169            .any(|condition| condition.matches(message))
170    }
171}