rnotifylib/
message_router.rs1use crate::config::Config;
2use crate::destination::routed_destination::{MessageRoutingBehaviour, RoutedDestination};
3use crate::destination::message_condition::MessageCondition;
4use crate::message::Message;
5use crate::send_error::{SendError, SendErrors};
6use serde::{Serialize, Deserialize};
7use crate::send_error::borrowed::SendErrorBorrowed;
8use crate::send_error::owned::SendErrorOwned;
9use crate::send_error::reported::{ErrorReportSummary, ReportedSendError};
10
11pub struct MessageRouter {
12 destinations: Vec<Box<dyn RoutedDestination>>,
13}
14
15impl MessageRouter {
16 pub fn empty() -> Self {
17 Self {
18 destinations: vec![],
19 }
20 }
21
22 pub fn from_config(config: Config) -> Self {
23 let destinations = config.take_destinations().into_iter()
24 .map(|item| {
25 let boxed: Box<dyn RoutedDestination> = Box::new(item);
26 boxed
27 })
28 .collect();
29
30 Self {
31 destinations,
32 }
33 }
34
35 pub fn add_destination(&mut self, destination: Box<dyn RoutedDestination>) {
36 self.destinations.push(destination)
37 }
38
39 pub fn route<'a>(&self, message: &'a Message) -> Result<usize, SendErrors<'a>> {
40 let mut errors: Vec<SendErrorBorrowed<'a>> = vec![];
41
42 let mut sent_to_non_root_dest = false;
43
44 let mut successful = 0;
45
46 fn send_wrap<'a>(destination: &dyn RoutedDestination, message: &'a Message) -> Result<(), SendErrorBorrowed<'a>> {
47 destination.send(message).map_err(|err| {
48 SendErrorBorrowed::create(err, destination.get_id().to_owned(), message)
49 })
50 }
51
52 fn send_to_dest<'a>(dest: &dyn RoutedDestination,
53 sent_to_non_root_dest: &mut bool,
54 successful: &mut usize,
55 errors: &mut Vec<SendErrorBorrowed<'a>>,
56 message: &'a Message) {
57 match send_wrap(dest, message) {
58 Ok(()) => {
59 if !dest.is_root() {
60 *sent_to_non_root_dest = true;
61 }
62 *successful += 1;
63 }
64 Err(send_err) => errors.push(send_err),
65 }
66 }
67
68 for dest in self.destinations.iter()
69 .filter(|dest| dest.get_routing_type().always_send_messages())
70 .filter(|dest| dest.should_receive(message)) {
71
72 send_to_dest(dest.as_ref(), &mut sent_to_non_root_dest, &mut successful, &mut errors, message);
73 }
74
75 if !sent_to_non_root_dest {
76 for dest in self.destinations.iter()
78 .filter(|dest| dest.get_routing_type() == &MessageRoutingBehaviour::Drain)
79 .filter(|dest| dest.should_receive(message)) {
80
81 send_to_dest(dest.as_ref(), &mut sent_to_non_root_dest, &mut successful, &mut errors, message);
82 }
83 }
84
85 if errors.is_empty() {
86 return Ok(successful);
87 }
88
89 let root_destinations: Vec<_> = self.destinations.iter()
90 .filter(|dest| dest.is_root())
91 .map(|dest| dest.to_owned())
92 .collect();
93
94 let reported_errors = errors.into_iter().map(|error| {
95 let report_message = error.create_report_message();
96 let mut any_report_success = false;
97 let mut report_fails = vec![];
98
99 for root_dest in &root_destinations {
100 match root_dest.send(&report_message) {
101 Ok(_) => {
102 any_report_success = true;
103 }
104 Err(send_err) => {
105 let send_err = SendErrorOwned::create(send_err, root_dest.get_id().to_owned(), report_message.clone());
106 report_fails.push(send_err);
107 }
108 }
109 }
110
111 ReportedSendError::new(error, ErrorReportSummary::new(any_report_success, report_fails))
112 }).collect();
113
114
115 Err(SendErrors::new(message, reported_errors, successful))
116 }
117
118}
119
120impl Default for MessageRouter {
121 fn default() -> Self {
122 Self::empty()
123 }
124}
125
126#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
140pub struct RoutingInfo {
141 #[serde(default)]
143 routing_type: MessageRoutingBehaviour,
144 #[serde(default, skip_serializing_if = "Vec::is_empty")]
145 whitelist: Vec<MessageCondition>,
146}
147
148impl RoutingInfo {
149 pub fn of(routing_type: MessageRoutingBehaviour) -> Self {
150 Self {
151 routing_type,
152 whitelist: vec![]
153 }
154 }
155
156 pub fn root() -> Self {
157 Self::of(MessageRoutingBehaviour::Root)
158 }
159
160 pub fn get_routing_behaviour(&self) -> &MessageRoutingBehaviour {
161 &self.routing_type
162 }
163
164 pub fn applies_to(&self, message: &Message) -> bool {
165 if self.whitelist.is_empty() {
166 return true;
167 }
168 self.whitelist.iter()
169 .any(|condition| condition.matches(message))
170 }
171}