arrows/routing/
messenger.rs

1//!`Messenger` sends out  messages to remote or local system. Tries to boot up
2//!listener binary in case of local connection failure.
3//!
4//!Uses a tcp client which serializes collection of messages. Each collection of messages
5//!ends with byte marks <https://github.com/ratulb/byte_marks>. This is how the receiving
6//!end reconstruct messages back. Message serialization and deserialization is based on
7//![bincode] <https://github.com/bincode-org/bincode> library.
8
9use crate::common::config::Config;
10use crate::routing::messenger::client::Client;
11use crate::{Action, Addr, Error::MsgSendError, Mail, Msg, Result};
12use std::collections::HashMap;
13use std::io::ErrorKind::ConnectionRefused;
14use std::net::SocketAddr;
15use std::process::Command;
16use std::thread;
17use std::time::Duration;
18
19///The client face of actor system. Sends out messages with text or binary(v8) as payload.
20pub struct Messenger;
21
22impl Messenger {
23    ///This function is responsible for dispatching messages received from the macro
24    ///invocation of `send!`. Multiple messages can be grouped for one more actors in one
25    ///`send!` macro invocation as shown below:
26    ///
27    ///Example
28    ///
29    ///```
30    ///use arrows::send;
31    ///use arrows::Msg;
32    ///
33    ///let m1 = Msg::with_text("Message to actor1");
34    ///let m2 = Msg::with_text("Message to actor1");
35    ///let m3 = Msg::with_text("Message to actor2");
36    ///let m4 = Msg::with_text("Message to actor1");
37    ///let m5 = Msg::with_text("Message to actor1");
38    ///send!("actor1", (m1, m2), "actor2", (m3), "actor1", (m4, m5));
39    ///```
40    ///Grouping within braces is not necessary while sending only to one actor:
41    ///
42    ///```
43    ///let m6 = Msg::with_text("Message to actor3")
44    ///let m7 = Msg::with_text("Message to actor3")
45    ///send!("actor3",m6,m7);
46    ///
47    ///```
48    ///Actors identified with string literal such as 'actor3' is assumed to be running in the
49    ///local system(if they are not running - they would be resurrected).
50    ///
51    ///Actors running in remote systems - need to identified by the `Addr` construct:
52    ///
53    ///```
54    ///use arrows::send;
55    ///use arrows::Msg;
56    ///use arrows::Addr;
57    ///
58    ///let remote_addr1 = Addr::remote("actor1", "10.10.10.10:7171");
59    ///let remote_addr2 = Addr::remote("actor2", "11.11.11.11:8181");
60    ///
61    ///let m1 = Msg::with_text("Message to remote actor1");
62    ///let m2 = Msg::with_text("Message to remote actor1");
63    ///let m3 = Msg::with_text("Message to remote actor2");
64    ///let m4 = Msg::with_text("Message to remote actor2");
65    ///
66    ///send!(remote_addr1, (m1,m2), remote_addr2, (m3,m4));
67    ///
68    ///```
69    ///Messages for each actor will always be delivered in the order they are ingested into
70    ///the system. Actor will not process out of sequence message. To prevent loss, messages
71    ///are persisted into an embedded store backed by highly performant sqlite db.
72    ///
73    ///
74    ///A new implementation of actor may be swapped in - replaching an actively running actor
75    ///in the system. Swapped in actor would take over from an outgoing actor and start
76    ///processing messages from where the outgoing left off. An actor would never process an
77    ///out of sequence message i.e. it would never increment its message sequence counter until
78    ///it has successfully processed the received message.
79    ///
80    ///Actors can change their behaviour while still running. They can create other actors
81    ///copies of themselves.
82    ///
83    ///Actors are allowed to panic a set number of times(currently 3).
84    ///                                  
85
86    pub fn send(mut mails: HashMap<&Addr, Vec<Msg>>) -> Result<()> {
87        mails.iter_mut().for_each(|(addr, msgs)| {
88            msgs.iter_mut().for_each(|msg| {
89                msg.set_recipient_addr(addr);
90            });
91            if let Some(host_addr) = addr.get_socket_addr() {
92                match Client::connect(host_addr) {
93                    Ok(mut client) => {
94                        if let Err(err) = client.send(msgs) {
95                            println!("{}", err);
96                        } else {
97                            println!("Messages sent to host {}", host_addr);
98                        }
99                    }
100                    Err(err) => {
101                        eprintln!("Host: {} {}", host_addr, err);
102                        let _rs = Self::handle_err(msgs, host_addr, err);
103                    }
104                }
105            }
106        });
107        Ok(())
108    }
109
110    pub(crate) fn mail(mail: Mail) -> Result<()> {
111        Self::group_by(mail.take_all())
112            .into_iter()
113            .for_each(|(host_addr, mut msgs)| {
114                let _rs = match Client::connect(host_addr) {
115                    Ok(mut client) => match client.send(&mut msgs) {
116                        Ok(ok) => {
117                            println!("Messages sent to {}", host_addr);
118                            Ok(ok)
119                        }
120                        Err(err) => {
121                            eprintln!("{}", err);
122                            Err(err.into())
123                        }
124                    },
125                    Err(err) => {
126                        eprintln!("Host: {} {}", host_addr, err);
127                        Self::handle_err(&mut msgs, host_addr, err)
128                    }
129                };
130            });
131        Ok(())
132    }
133
134    fn group_by(msgs: Vec<Msg>) -> HashMap<SocketAddr, Vec<Msg>> {
135        let mut groups: HashMap<SocketAddr, Vec<Msg>> = HashMap::new();
136        for msg in msgs {
137            let host_addr = match msg.get_to() {
138                Some(ref addr) => addr.get_socket_addr().expect("host"),
139                None => continue,
140            };
141            groups.entry(host_addr).or_default().push(msg);
142        }
143        groups
144    }
145
146    fn handle_err(msgs: &mut Vec<Msg>, addr: SocketAddr, err: std::io::Error) -> Result<()> {
147        match err.kind() {
148            ConnectionRefused => Self::try_bootup_and_resend(msgs, addr),
149            _ => {
150                eprintln!("{}", err);
151                Err(MsgSendError(err))
152            }
153        }
154    }
155    fn try_bootup_and_resend(msgs: &mut Vec<Msg>, socket_addr: SocketAddr) -> Result<()> {
156        if Addr::is_ip_local(socket_addr.ip()) {
157            if !msgs.is_empty()
158                && (msgs[0].command_equals(Action::Shutdown)
159                    || msgs[0].command_equals(Action::Echo("".to_string())))
160            {
161                return Ok(());
162            } else {
163                if let Err(err) = Self::bootup() {
164                    eprintln!("Bootup error {:?}", err);
165                }
166                let mail: Mail = std::mem::take(msgs).into();
167                thread::sleep(Duration::from_millis(100));
168                if let Err(err) = Messenger::mail(mail) {
169                    eprintln!("Messenger mail error {:?}", err);
170                }
171            }
172        }
173        Ok(())
174    }
175
176    ///Boots up the msg listener(`MessageListener`) binary. The resident binary path is
177    ///configurable via the environment variable 'resident_listener'.
178
179    pub fn bootup() -> Result<()> {
180        let mut resident_listener = std::env::current_dir()?;
181        resident_listener.push(Config::get_shared().resident_listener());
182        let path = resident_listener.as_path().to_str();
183        match path {
184            Some(path) => {
185                Command::new(path).spawn()?;
186            }
187            None => eprintln!("Listener binary could not be found in {:?}", path),
188        }
189        Ok(())
190    }
191}
192
193pub(super) mod client {
194
195    use crate::{option_of_bytes, Mail, Msg};
196    use byte_marks::ByteMarker;
197
198    use std::io::{BufReader, BufWriter, Error, ErrorKind, Read, Result, Write};
199    use std::net::{TcpStream, ToSocketAddrs};
200
201    pub struct Client<'a> {
202        reader: BufReader<TcpStream>,
203        writer: BufWriter<TcpStream>,
204        marker: ByteMarker<'a>,
205    }
206
207    impl Client<'_> {
208        pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
209            let stream = TcpStream::connect(addr)?;
210            let write_half = stream.try_clone()?;
211            Ok(Client {
212                reader: BufReader::new(stream),
213                writer: BufWriter::new(write_half),
214                marker: ByteMarker::with_defaults(),
215            })
216        }
217
218        pub fn send(&mut self, msgs: &mut Vec<Msg>) -> Result<()> {
219            let bulk = Mail::Bulk(std::mem::take(msgs));
220            match option_of_bytes(&bulk) {
221                Some(ref mut bytes) => {
222                    self.marker.mark_tail(bytes);
223                    self.writer.write_all(bytes)?;
224                    self.writer.flush()?;
225                    let mut buf = vec![0; 256];
226                    let len = self.reader.read(&mut buf)?;
227                    println!("{}", String::from_utf8_lossy(&buf[..len]));
228                    Ok(())
229                }
230                None => {
231                    eprintln!("Error converting message to bytes");
232                    Err(Error::new(
233                        ErrorKind::Other,
234                        "Error converting message to bytes",
235                    ))
236                }
237            }
238        }
239    }
240}