dotlib/
dot.rs

1use std::result::{Result};
2use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
3use std::thread::{self, JoinHandle};
4use std::string::ToString;
5use std::any::Any;
6use std::str;
7use regex::Regex;
8use multicast;
9use helper::*;
10use plugin_mngt::{PluginManager};
11use dynamic_reload::{DynamicReload, Lib, Symbol, Search, PlatformName, UpdateState};
12use json::JsonValue;
13
14
15lazy_static! {
16    static ref GET: Regex = Regex::new("^get|^GET").unwrap();
17    static ref POST: Regex = Regex::new("^post|^POST").unwrap();
18    static ref PUT: Regex = Regex::new("^put|^PUT").unwrap();
19    static ref DELETE: Regex = Regex::new("^delete|^DELETE").unwrap();
20}
21
22pub struct Dot<'a> {
23    addr: IpAddr,
24    cmd_delimiter: &'a str,
25    data: json::JsonValue,
26    listener: Option<JoinHandle<()>>,
27    plugin_mngr: PluginManager<'a>,
28    port: u16,
29    speaker: Option<UdpSocket>,
30}
31
32impl<'a> Dot<'a>{
33    pub fn new(addr_ip: IpAddr, port_number: u16, data_point: &[u8], plugin_dir: Option<&'a str>) -> Dot<'a> {
34        let plg_dir = plugin_dir.unwrap_or_else(||"./plugins");
35        let mut pm = PluginManager::new(vec![plg_dir], "/tmp/plugins");
36        assert!(pm.add_plugin("plugin_helloWorld").is_ok());
37
38        let mut o = json::JsonValue::new_object();
39        let mut d = json::JsonValue::new_array();
40
41        for byt in data_point{
42            d.push(*byt as u64);
43        }
44
45        o["data"] = d;
46        
47        Dot{
48            addr: addr_ip,
49            cmd_delimiter: "::",
50            data: o,
51            listener: None,
52            plugin_mngr: pm,
53            port: port_number,
54            speaker: None,
55        }
56    }
57
58    pub fn drop_plugins(&mut self) -> std::result::Result<i32, &'static str>{
59        println!("dropping plugins...");
60        self.plugin_mngr.unload()
61    }
62
63    pub fn get_datapoint(&self) -> Vec<u8>{
64        let mut arr: Vec<u8> = Vec::new();
65
66        for byt in self.data["data"].members(){
67            arr.push(byt.as_u8().unwrap().clone());
68        }
69        
70        arr
71    }
72
73    pub fn health(&mut self) -> std::result::Result<i32, &'static str>{
74        Ok(1)
75    }
76
77    /// This function maps the end user's command (.e.g; SELF::ATTR::name) to the plugin that owns to the 
78    /// processing of the command. 
79    pub fn process_command(&mut self, message: &[u8]) -> std::result::Result<Box<Any>, &'static str>{
80        let msg = str::from_utf8(&message).unwrap();
81        let cmds: Vec<&str> = msg.split(CMD_DELIMITER).collect();
82        //Ok(Box::new(cmds[0].to_string()))
83        
84        match 1 {
85            1 | _ if GET.is_match(cmds[0]) => {
86                match self.speak(cmds[1].as_bytes()) {
87                    Ok(v) => Ok(Box::new("want! want! want!".to_string())),
88                    Err(e) => Err(e),
89                }
90            },
91            _ => {
92                self.plugin_mngr.refresh_plugins();
93
94                let plgs = self.plugin_mngr.get_plugins();
95
96                for plg in self.plugin_mngr.get_plugins() {
97                    let pc: Symbol<extern fn(message: &[u8], data: &json::JsonValue) -> json::JsonValue> = unsafe {
98                        plg.lib.get(b"process_command\0").unwrap()
99                    };
100
101                    self.data = pc(message, &self.data.clone());
102                }
103
104                Ok(Box::new("processed...".to_string()))
105            }
106        }
107    }
108
109    pub fn start_listener(&mut self) -> std::result::Result<i32, &'static str>{
110        assert!(self.addr.is_multicast());
111
112        let addr = SocketAddr::new(self.addr, self.port);
113        self.listener = Some(multicast::multicast_listener(addr));
114
115        Ok(1)
116    }
117
118    pub fn stop_listener(&mut self) -> std::result::Result<i32, &'static str>{
119        self.speak(b"stop")
120    }
121
122    pub fn speak(&mut self, message: &[u8]) -> std::result::Result<i32, &'static str>{
123        let addr = SocketAddr::new(self.addr, self.port);
124        let sender = multicast::new_sender(&addr);
125
126        match sender {
127            Ok(val) => {
128                let speaker = Some(val);
129                speaker.unwrap().send_to(message, &addr).expect("could not send_to!");
130                Ok(1)
131            },
132            Err(err) => {
133                println!("{}",err);
134                Err("Warning: Couldn't start the data dot speaker!")
135            },
136        }
137    }
138}
139
140/// UNIT TESTING
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use std::time;
145    use std::time::{Duration, Instant};
146
147    fn wait(){
148        let ten_millis = time::Duration::from_millis(1000);
149        let now = time::Instant::now();
150        thread::sleep(ten_millis);
151        assert!(now.elapsed() >= ten_millis);
152    }
153
154    #[test]
155    fn test_get_datapoint() {
156        let a: IpAddr = Ipv6Addr::new(0xFF03, 0, 0, 0, 0, 0, 0, 0x0123).into();
157        let p: u16 = 7645;
158        let msg = String::from("datapoint_test");
159        let path = Some("./target/debug/examples");
160        let dot = Dot::new(a, p, &msg.as_bytes(), path);
161
162        assert_eq!(dot.get_datapoint(), msg.as_bytes());
163    }
164
165    #[test]
166    fn test_health() {
167        let a: IpAddr = Ipv6Addr::new(0xFF03, 0, 0, 0, 0, 0, 0, 0x0123).into();
168        let p: u16 = 7645;
169        let path = Some("./target/debug/examples");
170        let mut dot = Dot::new(a, p, String::from("hello").as_bytes(), path);
171
172        assert!(dot.health().is_ok());
173    }
174
175    #[test]
176    fn test_communication() {
177        let a: IpAddr = Ipv4Addr::new(224, 0, 0, 123).into();
178        let p: u16 = 7645;
179        let path = Some("./target/debug/examples");
180        let mut dot = Dot::new(a, p, String::from("hello").as_bytes(), path);
181
182        assert!(dot.start_listener().is_ok());
183        assert!(dot.process_command(b"echo::first message").is_ok());    
184
185        //wait a few moments
186        wait();
187
188        assert!(dot.process_command(b"echo::second message").is_ok()); 
189        assert!(dot.stop_listener().is_ok());
190    }
191
192
193    #[test]
194    fn test_process_commands() {
195        let a: IpAddr = Ipv4Addr::new(224, 0, 0, 123).into();
196        let p: u16 = 7645;
197        let path = Some("./target/debug/examples");
198        let mut dot = Dot::new(a, p, String::from("hello").as_bytes(), path);
199        let ok_cmds = vec!["GET::test"];
200 
201        for cmd in ok_cmds {
202            assert!(dot.process_command(cmd.as_bytes()).is_ok());
203        }
204
205        //assert!(dot.process_command(b"blah::test").is_err());  
206    }
207}