1mod error;
4mod config;
5
6pub use crate::error::{Error, Result};
7pub use crate::config::Config;
8use std::path::Path;
9use serde::{Deserialize, Serialize};
10
11
12const ACK: &str = "K";
14
15#[macro_export]
17macro_rules! sendstr {
18 ($source:expr, $fmt:expr $(, $arg:expr)*) => {{
19 let source: &mut $crate::Source = &mut $source;
20 source.send(&format!($fmt $(, $arg)*))
21 }};
22}
23
24
25pub struct Source {
26 #[allow(unused)]
27 ctx: zmq::Context,
28 socket: zmq::Socket,
29 #[allow(unused)]
30 cfg: Config,
31}
32
33impl Source {
34 pub fn from_toml<P: AsRef<Path>>(toml_path: P) -> Result<Self> {
35 let cfg = Config::parse_toml(toml_path)?;
36 Self::from_config(cfg)
37 }
38
39 pub fn from_config(cfg: Config) -> Result<Self> {
40 let ctx = zmq::Context::new();
41 let socket = ctx.socket(zmq::REQ)?;
42 socket.connect(&format!("tcp://{}:{}", cfg.host, cfg.port))?;
43 Ok(Self { ctx, socket, cfg })
44 }
45
46 pub fn send<V>(&mut self, value: &V) -> Result<()>
50 where V: ?Sized + Serialize {
51 imp::send(&mut self.socket, value)?;
52 let reply: String = imp::recv(&mut self.socket)?;
53 debug_assert_eq!(reply, ACK);
54 Ok(())
55 }
56
57 #[inline(always)]
58 pub fn config(&self) -> &Config { &self.cfg }
59}
60
61
62pub struct Sink {
63 #[allow(unused)]
64 ctx: zmq::Context,
65 socket: zmq::Socket,
66 #[allow(unused)]
67 cfg: Config,
68}
69
70impl Sink {
71 pub fn from_toml<P: AsRef<Path>>(toml_path: P) -> Result<Self> {
72 let cfg = Config::parse_toml(toml_path)?;
73 Self::from_config(cfg)
74 }
75
76 pub fn from_config(cfg: Config) -> Result<Self> {
77 let ctx = zmq::Context::new();
78 let socket = ctx.socket(zmq::REP)?;
79 socket.bind(&format!("tcp://*:{}", cfg.port))?;
80 Ok(Self { ctx, socket, cfg })
81 }
82
83
84 pub fn recv<V>(&mut self) -> Result<V>
85 where V: for<'de> Deserialize<'de> {
86 let msg: V = imp::recv(&mut self.socket)?;
87 imp::send(&mut self.socket, ACK)?;
88 Ok(msg)
89 }
90
91 #[inline(always)]
92 pub fn config(&self) -> &Config { &self.cfg }
93}
94
95
96mod imp {
97 use super::*;
98
99 const NO_FLAGS: i32 = 0;
100
101 #[inline(always)]
102 pub(super) fn send<V>(socket: &mut zmq::Socket, value: &V) -> Result<()>
103 where V: ?Sized + Serialize {
104 let s: String = serde_json::to_string(value)?;
105 socket.send(&s, NO_FLAGS)?;
106 Ok(())
107 }
108
109 #[inline(always)]
110 pub(super) fn recv<V>(socket: &mut zmq::Socket) -> Result<V>
111 where V: for<'de> Deserialize<'de> {
112 match socket.recv_string(NO_FLAGS)? {
113 Ok(s) => Ok(serde_json::from_str::<V>(&s)?),
114 Err(bytes) => Err(Error::NotUtf8Error(bytes)),
115 }
116 }
117}
118
119
120
121#[cfg(test)]
122mod tests {
123 use crate::error::Result;
124 use serde_derive::{Deserialize, Serialize};
125 use super::*;
126
127 #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
128 struct Foo(String, usize);
129
130 #[test]
131 fn send_and_receive_msg() -> Result<()> {
132 let cfg = Config {
133 host: "127.0.0.1".to_string(),
134 port: 11001, };
136 let mut source = Source::from_config(cfg.clone())?;
137 let mut sink = Sink::from_config(cfg.clone())?;
138 let thread_guard = std::thread::spawn(move || {
139 let msg0: String = sink.recv().expect("Sink failed to receive MSG0");
140 assert_eq!(msg0, "Hello World! 0");
141 let msg1: String = sink.recv().expect("Sink failed to receive MSG1");
142 assert_eq!(msg1, "Hello World! 1");
143 let msg2: Foo = sink.recv().expect("Sink failed to receive MSG2");
144 assert_eq!(msg2, Foo("Hello World! 2".to_string(), 42));
145 });
146 source.send("Hello World! 0")?;
147 source.send("Hello World! 1")?;
148 source.send(&Foo("Hello World! 2".to_string(), 42))?;
149 thread_guard.join().unwrap();
150 Ok(())
151 }
152
153 #[test]
154 fn multiple_senders() -> Result<()> {
155 let cfg = Config {
156 host: "127.0.0.1".to_string(),
157 port: 11002, };
159 let mut source0 = Source::from_config(cfg.clone())?;
160 let mut source1 = Source::from_config(cfg.clone())?;
161 let mut sink = Sink::from_config(cfg.clone())?;
162 let thread_guard = std::thread::spawn(move || {
163 let msg0: String = sink.recv().expect("Sink failed to receive MSG0");
164 assert_eq!(msg0, "Hello World! 0");
165 let msg1: String = sink.recv().expect("Sink failed to receive MSG1");
166 assert_eq!(msg1, "Hello World! 1");
167 let msg2: Foo = sink.recv().expect("Sink failed to receive MSG2");
168 assert_eq!(msg2, Foo("Hello World! 2".to_string(), 42));
169 });
170 source0.send("Hello World! 0")?;
171 source1.send("Hello World! 1")?;
172 source0.send(&Foo("Hello World! 2".to_string(), 42))?;
173 thread_guard.join().unwrap();
174 Ok(())
175 }
176
177 #[test]
178 fn send_str_macro() -> Result<()> {
179 let cfg = Config {
180 host: "127.0.0.1".to_string(),
181 port: 11003, };
183 let mut source0 = Source::from_config(cfg.clone())?;
184 let mut source1 = Source::from_config(cfg.clone())?;
185 let mut sink = Sink::from_config(cfg.clone())?;
186 let thread_guard = std::thread::spawn(move || {
187 let msg0: String = sink.recv().expect("Sink failed to receive msg0");
188 assert_eq!(msg0, "Hello World! 0");
189 let msg1: String = sink.recv().expect("Sink failed to receive msg1");
190 assert_eq!(msg1, "Hello World! 1");
191 let msg2: Foo = sink.recv().expect("Sink failed to receive msg2");
192 assert_eq!(msg2, Foo("Hello World! 2".to_string(), 42));
193 });
194 sendstr!(source0, "Hello World! {}", 0)?;
195 sendstr!(source1, "Hello World! {}", 1)?;
196 source0.send(&Foo("Hello World! 2".to_string(), 42))?;
197 thread_guard.join().unwrap();
198 Ok(())
199 }
200
201 #[test]
202 fn read_config_file() -> Result<()> {
203 let cfg = Config::parse_toml("ipc-chan.toml")?;
204 let default_cfg = Config::default();
205 assert_eq!(cfg.host, default_cfg.host);
206 assert_eq!(cfg.port, default_cfg.port);
207 Ok(())
208 }
209
210}