1use std::result::{Result};
2use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
3use std::thread::{self, JoinHandle};
4use std::string::ToString;
5use std::any::Any;
6use std::str;
7use regex::Regex;
8use multicast;
9use helper::*;
10use plugin_mngt::{PluginManager};
11use dynamic_reload::{DynamicReload, Lib, Symbol, Search, PlatformName, UpdateState};
12use json::JsonValue;
13
14
15lazy_static! {
16 static ref GET: Regex = Regex::new("^get|^GET").unwrap();
17 static ref POST: Regex = Regex::new("^post|^POST").unwrap();
18 static ref PUT: Regex = Regex::new("^put|^PUT").unwrap();
19 static ref DELETE: Regex = Regex::new("^delete|^DELETE").unwrap();
20}
21
22pub struct Dot<'a> {
23 addr: IpAddr,
24 cmd_delimiter: &'a str,
25 data: json::JsonValue,
26 listener: Option<JoinHandle<()>>,
27 plugin_mngr: PluginManager<'a>,
28 port: u16,
29 speaker: Option<UdpSocket>,
30}
31
32impl<'a> Dot<'a>{
33 pub fn new(addr_ip: IpAddr, port_number: u16, data_point: &[u8], plugin_dir: Option<&'a str>) -> Dot<'a> {
34 let plg_dir = plugin_dir.unwrap_or_else(||"./plugins");
35 let mut pm = PluginManager::new(vec![plg_dir], "/tmp/plugins");
36 assert!(pm.add_plugin("plugin_helloWorld").is_ok());
37
38 let mut o = json::JsonValue::new_object();
39 let mut d = json::JsonValue::new_array();
40
41 for byt in data_point{
42 d.push(*byt as u64);
43 }
44
45 o["data"] = d;
46
47 Dot{
48 addr: addr_ip,
49 cmd_delimiter: "::",
50 data: o,
51 listener: None,
52 plugin_mngr: pm,
53 port: port_number,
54 speaker: None,
55 }
56 }
57
58 pub fn drop_plugins(&mut self) -> std::result::Result<i32, &'static str>{
59 println!("dropping plugins...");
60 self.plugin_mngr.unload()
61 }
62
63 pub fn get_datapoint(&self) -> Vec<u8>{
64 let mut arr: Vec<u8> = Vec::new();
65
66 for byt in self.data["data"].members(){
67 arr.push(byt.as_u8().unwrap().clone());
68 }
69
70 arr
71 }
72
73 pub fn health(&mut self) -> std::result::Result<i32, &'static str>{
74 Ok(1)
75 }
76
77 pub fn process_command(&mut self, message: &[u8]) -> std::result::Result<Box<Any>, &'static str>{
80 let msg = str::from_utf8(&message).unwrap();
81 let cmds: Vec<&str> = msg.split(CMD_DELIMITER).collect();
82 match 1 {
85 1 | _ if GET.is_match(cmds[0]) => {
86 match self.speak(cmds[1].as_bytes()) {
87 Ok(v) => Ok(Box::new("want! want! want!".to_string())),
88 Err(e) => Err(e),
89 }
90 },
91 _ => {
92 self.plugin_mngr.refresh_plugins();
93
94 let plgs = self.plugin_mngr.get_plugins();
95
96 for plg in self.plugin_mngr.get_plugins() {
97 let pc: Symbol<extern fn(message: &[u8], data: &json::JsonValue) -> json::JsonValue> = unsafe {
98 plg.lib.get(b"process_command\0").unwrap()
99 };
100
101 self.data = pc(message, &self.data.clone());
102 }
103
104 Ok(Box::new("processed...".to_string()))
105 }
106 }
107 }
108
109 pub fn start_listener(&mut self) -> std::result::Result<i32, &'static str>{
110 assert!(self.addr.is_multicast());
111
112 let addr = SocketAddr::new(self.addr, self.port);
113 self.listener = Some(multicast::multicast_listener(addr));
114
115 Ok(1)
116 }
117
118 pub fn stop_listener(&mut self) -> std::result::Result<i32, &'static str>{
119 self.speak(b"stop")
120 }
121
122 pub fn speak(&mut self, message: &[u8]) -> std::result::Result<i32, &'static str>{
123 let addr = SocketAddr::new(self.addr, self.port);
124 let sender = multicast::new_sender(&addr);
125
126 match sender {
127 Ok(val) => {
128 let speaker = Some(val);
129 speaker.unwrap().send_to(message, &addr).expect("could not send_to!");
130 Ok(1)
131 },
132 Err(err) => {
133 println!("{}",err);
134 Err("Warning: Couldn't start the data dot speaker!")
135 },
136 }
137 }
138}
139
140#[cfg(test)]
142mod tests {
143 use super::*;
144 use std::time;
145 use std::time::{Duration, Instant};
146
147 fn wait(){
148 let ten_millis = time::Duration::from_millis(1000);
149 let now = time::Instant::now();
150 thread::sleep(ten_millis);
151 assert!(now.elapsed() >= ten_millis);
152 }
153
154 #[test]
155 fn test_get_datapoint() {
156 let a: IpAddr = Ipv6Addr::new(0xFF03, 0, 0, 0, 0, 0, 0, 0x0123).into();
157 let p: u16 = 7645;
158 let msg = String::from("datapoint_test");
159 let path = Some("./target/debug/examples");
160 let dot = Dot::new(a, p, &msg.as_bytes(), path);
161
162 assert_eq!(dot.get_datapoint(), msg.as_bytes());
163 }
164
165 #[test]
166 fn test_health() {
167 let a: IpAddr = Ipv6Addr::new(0xFF03, 0, 0, 0, 0, 0, 0, 0x0123).into();
168 let p: u16 = 7645;
169 let path = Some("./target/debug/examples");
170 let mut dot = Dot::new(a, p, String::from("hello").as_bytes(), path);
171
172 assert!(dot.health().is_ok());
173 }
174
175 #[test]
176 fn test_communication() {
177 let a: IpAddr = Ipv4Addr::new(224, 0, 0, 123).into();
178 let p: u16 = 7645;
179 let path = Some("./target/debug/examples");
180 let mut dot = Dot::new(a, p, String::from("hello").as_bytes(), path);
181
182 assert!(dot.start_listener().is_ok());
183 assert!(dot.process_command(b"echo::first message").is_ok());
184
185 wait();
187
188 assert!(dot.process_command(b"echo::second message").is_ok());
189 assert!(dot.stop_listener().is_ok());
190 }
191
192
193 #[test]
194 fn test_process_commands() {
195 let a: IpAddr = Ipv4Addr::new(224, 0, 0, 123).into();
196 let p: u16 = 7645;
197 let path = Some("./target/debug/examples");
198 let mut dot = Dot::new(a, p, String::from("hello").as_bytes(), path);
199 let ok_cmds = vec!["GET::test"];
200
201 for cmd in ok_cmds {
202 assert!(dot.process_command(cmd.as_bytes()).is_ok());
203 }
204
205 }
207}