arrows/routing/messenger.rs
1//!`Messenger` sends out messages to remote or local system. Tries to boot up
2//!listener binary in case of local connection failure.
3//!
4//!Uses a tcp client which serializes collection of messages. Each collection of messages
5//!ends with byte marks <https://github.com/ratulb/byte_marks>. This is how the receiving
6//!end reconstruct messages back. Message serialization and deserialization is based on
7//![bincode] <https://github.com/bincode-org/bincode> library.
8
9use crate::common::config::Config;
10use crate::routing::messenger::client::Client;
11use crate::{Action, Addr, Error::MsgSendError, Mail, Msg, Result};
12use std::collections::HashMap;
13use std::io::ErrorKind::ConnectionRefused;
14use std::net::SocketAddr;
15use std::process::Command;
16use std::thread;
17use std::time::Duration;
18
19///The client face of actor system. Sends out messages with text or binary(v8) as payload.
20pub struct Messenger;
21
22impl Messenger {
23 ///This function is responsible for dispatching messages received from the macro
24 ///invocation of `send!`. Multiple messages can be grouped for one more actors in one
25 ///`send!` macro invocation as shown below:
26 ///
27 ///Example
28 ///
29 ///```
30 ///use arrows::send;
31 ///use arrows::Msg;
32 ///
33 ///let m1 = Msg::with_text("Message to actor1");
34 ///let m2 = Msg::with_text("Message to actor1");
35 ///let m3 = Msg::with_text("Message to actor2");
36 ///let m4 = Msg::with_text("Message to actor1");
37 ///let m5 = Msg::with_text("Message to actor1");
38 ///send!("actor1", (m1, m2), "actor2", (m3), "actor1", (m4, m5));
39 ///```
40 ///Grouping within braces is not necessary while sending only to one actor:
41 ///
42 ///```
43 ///let m6 = Msg::with_text("Message to actor3")
44 ///let m7 = Msg::with_text("Message to actor3")
45 ///send!("actor3",m6,m7);
46 ///
47 ///```
48 ///Actors identified with string literal such as 'actor3' is assumed to be running in the
49 ///local system(if they are not running - they would be resurrected).
50 ///
51 ///Actors running in remote systems - need to identified by the `Addr` construct:
52 ///
53 ///```
54 ///use arrows::send;
55 ///use arrows::Msg;
56 ///use arrows::Addr;
57 ///
58 ///let remote_addr1 = Addr::remote("actor1", "10.10.10.10:7171");
59 ///let remote_addr2 = Addr::remote("actor2", "11.11.11.11:8181");
60 ///
61 ///let m1 = Msg::with_text("Message to remote actor1");
62 ///let m2 = Msg::with_text("Message to remote actor1");
63 ///let m3 = Msg::with_text("Message to remote actor2");
64 ///let m4 = Msg::with_text("Message to remote actor2");
65 ///
66 ///send!(remote_addr1, (m1,m2), remote_addr2, (m3,m4));
67 ///
68 ///```
69 ///Messages for each actor will always be delivered in the order they are ingested into
70 ///the system. Actor will not process out of sequence message. To prevent loss, messages
71 ///are persisted into an embedded store backed by highly performant sqlite db.
72 ///
73 ///
74 ///A new implementation of actor may be swapped in - replaching an actively running actor
75 ///in the system. Swapped in actor would take over from an outgoing actor and start
76 ///processing messages from where the outgoing left off. An actor would never process an
77 ///out of sequence message i.e. it would never increment its message sequence counter until
78 ///it has successfully processed the received message.
79 ///
80 ///Actors can change their behaviour while still running. They can create other actors
81 ///copies of themselves.
82 ///
83 ///Actors are allowed to panic a set number of times(currently 3).
84 ///
85
86 pub fn send(mut mails: HashMap<&Addr, Vec<Msg>>) -> Result<()> {
87 mails.iter_mut().for_each(|(addr, msgs)| {
88 msgs.iter_mut().for_each(|msg| {
89 msg.set_recipient_addr(addr);
90 });
91 if let Some(host_addr) = addr.get_socket_addr() {
92 match Client::connect(host_addr) {
93 Ok(mut client) => {
94 if let Err(err) = client.send(msgs) {
95 println!("{}", err);
96 } else {
97 println!("Messages sent to host {}", host_addr);
98 }
99 }
100 Err(err) => {
101 eprintln!("Host: {} {}", host_addr, err);
102 let _rs = Self::handle_err(msgs, host_addr, err);
103 }
104 }
105 }
106 });
107 Ok(())
108 }
109
110 pub(crate) fn mail(mail: Mail) -> Result<()> {
111 Self::group_by(mail.take_all())
112 .into_iter()
113 .for_each(|(host_addr, mut msgs)| {
114 let _rs = match Client::connect(host_addr) {
115 Ok(mut client) => match client.send(&mut msgs) {
116 Ok(ok) => {
117 println!("Messages sent to {}", host_addr);
118 Ok(ok)
119 }
120 Err(err) => {
121 eprintln!("{}", err);
122 Err(err.into())
123 }
124 },
125 Err(err) => {
126 eprintln!("Host: {} {}", host_addr, err);
127 Self::handle_err(&mut msgs, host_addr, err)
128 }
129 };
130 });
131 Ok(())
132 }
133
134 fn group_by(msgs: Vec<Msg>) -> HashMap<SocketAddr, Vec<Msg>> {
135 let mut groups: HashMap<SocketAddr, Vec<Msg>> = HashMap::new();
136 for msg in msgs {
137 let host_addr = match msg.get_to() {
138 Some(ref addr) => addr.get_socket_addr().expect("host"),
139 None => continue,
140 };
141 groups.entry(host_addr).or_default().push(msg);
142 }
143 groups
144 }
145
146 fn handle_err(msgs: &mut Vec<Msg>, addr: SocketAddr, err: std::io::Error) -> Result<()> {
147 match err.kind() {
148 ConnectionRefused => Self::try_bootup_and_resend(msgs, addr),
149 _ => {
150 eprintln!("{}", err);
151 Err(MsgSendError(err))
152 }
153 }
154 }
155 fn try_bootup_and_resend(msgs: &mut Vec<Msg>, socket_addr: SocketAddr) -> Result<()> {
156 if Addr::is_ip_local(socket_addr.ip()) {
157 if !msgs.is_empty()
158 && (msgs[0].command_equals(Action::Shutdown)
159 || msgs[0].command_equals(Action::Echo("".to_string())))
160 {
161 return Ok(());
162 } else {
163 if let Err(err) = Self::bootup() {
164 eprintln!("Bootup error {:?}", err);
165 }
166 let mail: Mail = std::mem::take(msgs).into();
167 thread::sleep(Duration::from_millis(100));
168 if let Err(err) = Messenger::mail(mail) {
169 eprintln!("Messenger mail error {:?}", err);
170 }
171 }
172 }
173 Ok(())
174 }
175
176 ///Boots up the msg listener(`MessageListener`) binary. The resident binary path is
177 ///configurable via the environment variable 'resident_listener'.
178
179 pub fn bootup() -> Result<()> {
180 let mut resident_listener = std::env::current_dir()?;
181 resident_listener.push(Config::get_shared().resident_listener());
182 let path = resident_listener.as_path().to_str();
183 match path {
184 Some(path) => {
185 Command::new(path).spawn()?;
186 }
187 None => eprintln!("Listener binary could not be found in {:?}", path),
188 }
189 Ok(())
190 }
191}
192
193pub(super) mod client {
194
195 use crate::{option_of_bytes, Mail, Msg};
196 use byte_marks::ByteMarker;
197
198 use std::io::{BufReader, BufWriter, Error, ErrorKind, Read, Result, Write};
199 use std::net::{TcpStream, ToSocketAddrs};
200
201 pub struct Client<'a> {
202 reader: BufReader<TcpStream>,
203 writer: BufWriter<TcpStream>,
204 marker: ByteMarker<'a>,
205 }
206
207 impl Client<'_> {
208 pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
209 let stream = TcpStream::connect(addr)?;
210 let write_half = stream.try_clone()?;
211 Ok(Client {
212 reader: BufReader::new(stream),
213 writer: BufWriter::new(write_half),
214 marker: ByteMarker::with_defaults(),
215 })
216 }
217
218 pub fn send(&mut self, msgs: &mut Vec<Msg>) -> Result<()> {
219 let bulk = Mail::Bulk(std::mem::take(msgs));
220 match option_of_bytes(&bulk) {
221 Some(ref mut bytes) => {
222 self.marker.mark_tail(bytes);
223 self.writer.write_all(bytes)?;
224 self.writer.flush()?;
225 let mut buf = vec![0; 256];
226 let len = self.reader.read(&mut buf)?;
227 println!("{}", String::from_utf8_lossy(&buf[..len]));
228 Ok(())
229 }
230 None => {
231 eprintln!("Error converting message to bytes");
232 Err(Error::new(
233 ErrorKind::Other,
234 "Error converting message to bytes",
235 ))
236 }
237 }
238 }
239 }
240}