ipc_chan/
lib.rs

1//!
2
3mod error;
4mod config;
5
6pub use crate::error::{Error, Result};
7pub use crate::config::Config;
8use std::path::Path;
9use serde::{Deserialize, Serialize};
10
11
12/// It's a little passive-aggressive, but it'll work.
13const ACK: &str = "K";
14
15/// A convenience macro to "print" formatted text to a `Source`.
16#[macro_export]
17macro_rules! sendstr {
18    ($source:expr, $fmt:expr $(, $arg:expr)*) => {{
19        let source: &mut $crate::Source = &mut $source;
20        source.send(&format!($fmt $(, $arg)*))
21    }};
22}
23
24
25pub struct Source {
26    #[allow(unused)]
27    ctx: zmq::Context,
28    socket: zmq::Socket,
29    #[allow(unused)]
30    cfg: Config,
31}
32
33impl Source {
34    pub fn from_toml<P: AsRef<Path>>(toml_path: P) -> Result<Self> {
35        let cfg = Config::parse_toml(toml_path)?;
36        Self::from_config(cfg)
37    }
38
39    pub fn from_config(cfg: Config) -> Result<Self> {
40        let ctx = zmq::Context::new();
41        let socket = ctx.socket(zmq::REQ)?;
42        socket.connect(&format!("tcp://{}:{}", cfg.host, cfg.port))?;
43        Ok(Self { ctx, socket, cfg })
44    }
45
46    /// Send a value of type `V`.
47    /// Return `Ok(())` if the value was sent successfully;
48    /// Otherwise return an error.
49    pub fn send<V>(&mut self, value: &V) -> Result<()>
50    where V: ?Sized + Serialize {
51        imp::send(&mut self.socket, value)?;
52        let reply: String = imp::recv(&mut self.socket)?;
53        debug_assert_eq!(reply, ACK);
54        Ok(())
55    }
56
57    #[inline(always)]
58    pub fn config(&self) -> &Config { &self.cfg }
59}
60
61
62pub struct Sink {
63    #[allow(unused)]
64    ctx: zmq::Context,
65    socket: zmq::Socket,
66    #[allow(unused)]
67    cfg: Config,
68}
69
70impl Sink {
71    pub fn from_toml<P: AsRef<Path>>(toml_path: P) -> Result<Self> {
72        let cfg = Config::parse_toml(toml_path)?;
73        Self::from_config(cfg)
74    }
75
76    pub fn from_config(cfg: Config) -> Result<Self> {
77        let ctx = zmq::Context::new();
78        let socket = ctx.socket(zmq::REP)?;
79        socket.bind(&format!("tcp://*:{}", cfg.port))?;
80        Ok(Self { ctx, socket, cfg })
81    }
82
83
84    pub fn recv<V>(&mut self) -> Result<V>
85    where V: for<'de> Deserialize<'de> {
86        let msg: V = imp::recv(&mut self.socket)?;
87        imp::send(&mut self.socket, ACK)?;
88        Ok(msg)
89    }
90
91    #[inline(always)]
92    pub fn config(&self) -> &Config { &self.cfg }
93}
94
95
96mod imp {
97    use super::*;
98
99    const NO_FLAGS: i32 = 0;
100
101    #[inline(always)]
102    pub(super) fn send<V>(socket: &mut zmq::Socket, value: &V) -> Result<()>
103    where V: ?Sized + Serialize {
104        let s: String = serde_json::to_string(value)?;
105        socket.send(&s, NO_FLAGS)?;
106        Ok(())
107    }
108
109    #[inline(always)]
110    pub(super) fn recv<V>(socket: &mut zmq::Socket) -> Result<V>
111    where V: for<'de> Deserialize<'de> {
112        match socket.recv_string(NO_FLAGS)? {
113            Ok(s) => Ok(serde_json::from_str::<V>(&s)?),
114            Err(bytes) => Err(Error::NotUtf8Error(bytes)),
115        }
116    }
117}
118
119
120
121#[cfg(test)]
122mod tests {
123    use crate::error::Result;
124    use serde_derive::{Deserialize, Serialize};
125    use super::*;
126
127    #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
128    struct Foo(String, usize);
129
130    #[test]
131    fn send_and_receive_msg() -> Result<()> {
132        let cfg = Config {
133            host: "127.0.0.1".to_string(),
134            port: 11001, // test-specific port
135        };
136        let mut source = Source::from_config(cfg.clone())?;
137        let mut   sink =   Sink::from_config(cfg.clone())?;
138        let thread_guard = std::thread::spawn(move || {
139            let msg0: String = sink.recv().expect("Sink failed to receive MSG0");
140            assert_eq!(msg0, "Hello World! 0");
141            let msg1: String = sink.recv().expect("Sink failed to receive MSG1");
142            assert_eq!(msg1, "Hello World! 1");
143            let msg2: Foo = sink.recv().expect("Sink failed to receive MSG2");
144            assert_eq!(msg2, Foo("Hello World! 2".to_string(), 42));
145        });
146        source.send("Hello World! 0")?;
147        source.send("Hello World! 1")?;
148        source.send(&Foo("Hello World! 2".to_string(), 42))?;
149        thread_guard.join().unwrap();
150        Ok(())
151    }
152
153    #[test]
154    fn multiple_senders() -> Result<()> {
155        let cfg = Config {
156            host: "127.0.0.1".to_string(),
157            port: 11002, // test-specific port
158        };
159        let mut source0 = Source::from_config(cfg.clone())?;
160        let mut source1 = Source::from_config(cfg.clone())?;
161        let mut    sink =   Sink::from_config(cfg.clone())?;
162        let thread_guard = std::thread::spawn(move || {
163            let msg0: String = sink.recv().expect("Sink failed to receive MSG0");
164            assert_eq!(msg0, "Hello World! 0");
165            let msg1: String = sink.recv().expect("Sink failed to receive MSG1");
166            assert_eq!(msg1, "Hello World! 1");
167            let msg2: Foo = sink.recv().expect("Sink failed to receive MSG2");
168            assert_eq!(msg2, Foo("Hello World! 2".to_string(), 42));
169        });
170        source0.send("Hello World! 0")?;
171        source1.send("Hello World! 1")?;
172        source0.send(&Foo("Hello World! 2".to_string(), 42))?;
173        thread_guard.join().unwrap();
174        Ok(())
175    }
176
177    #[test]
178    fn send_str_macro() -> Result<()> {
179        let cfg = Config {
180            host: "127.0.0.1".to_string(),
181            port: 11003, // test-specific port
182        };
183        let mut source0 = Source::from_config(cfg.clone())?;
184        let mut source1 = Source::from_config(cfg.clone())?;
185        let mut    sink =   Sink::from_config(cfg.clone())?;
186        let thread_guard = std::thread::spawn(move || {
187            let msg0: String = sink.recv().expect("Sink failed to receive msg0");
188            assert_eq!(msg0, "Hello World! 0");
189            let msg1: String = sink.recv().expect("Sink failed to receive msg1");
190            assert_eq!(msg1, "Hello World! 1");
191            let msg2: Foo = sink.recv().expect("Sink failed to receive msg2");
192            assert_eq!(msg2, Foo("Hello World! 2".to_string(), 42));
193        });
194        sendstr!(source0, "Hello World! {}", 0)?;
195        sendstr!(source1, "Hello World! {}", 1)?;
196        source0.send(&Foo("Hello World! 2".to_string(), 42))?;
197        thread_guard.join().unwrap();
198        Ok(())
199    }
200
201    #[test]
202    fn read_config_file() -> Result<()> {
203        let cfg = Config::parse_toml("ipc-chan.toml")?;
204        let default_cfg = Config::default();
205        assert_eq!(cfg.host, default_cfg.host);
206        assert_eq!(cfg.port, default_cfg.port);
207        Ok(())
208    }
209
210}