blockade/
blockade.rs

1use std::collections::HashMap;
2use std::{error, fmt};
3
4use serde_json;
5
6use rand::{seq, thread_rng};
7use reqwest;
8
9use common::*;
10
11#[derive(Debug)]
12pub enum BlockadeError {
13    HttpError(reqwest::Error),
14    ServerError(String),
15    OtherError(String),
16    JsonError(serde_json::Error),
17}
18
19impl fmt::Display for BlockadeError {
20    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21        match *self {
22            BlockadeError::HttpError(ref n) => write!(f, "HTTP error: {:?}", n),
23            BlockadeError::OtherError(ref n) => write!(f, "Other error: {:?}", n),
24            BlockadeError::ServerError(ref n) => write!(f, "Server error: {:?}", n),
25            BlockadeError::JsonError(ref n) => write!(f, "JSON parsing error: {:?}", n),
26        }
27    }
28}
29
30impl From<reqwest::Error> for BlockadeError {
31    fn from(error: reqwest::Error) -> BlockadeError {
32        return BlockadeError::HttpError(error);
33    }
34}
35
36impl From<serde_json::Error> for BlockadeError {
37    fn from(error: serde_json::Error) -> BlockadeError {
38        return BlockadeError::JsonError(error);
39    }
40}
41
42impl error::Error for BlockadeError {
43    fn description(&self) -> &str {
44        "Something went wrong with the blockade"
45    }
46    fn cause(&self) -> Option<&error::Error> {
47        return None;
48    }
49}
50
51#[derive(Debug)]
52pub struct BlockadeHandler {
53    pub client: reqwest::Client,
54    pub host: String,
55    pub blockades: Vec<String>,
56    pub state: HashMap<String, BlockadeState>,
57    pub config: HashMap<String, BlockadeConfig>,
58}
59
60impl BlockadeHandler {
61    /// Make a new BlockadeHandler that uses a blockade instance
62    /// started at "host".
63    pub fn new(host: &str) -> Self {
64        let client = reqwest::Client::new();
65        let mut handler = BlockadeHandler {
66            client: client,
67            host: host.to_owned(),
68            blockades: Vec::new(),
69            state: HashMap::new(),
70            config: HashMap::new(),
71        };
72        match handler.execute_list_blockades() {
73            Ok(_val) => {
74                for i in 0..handler.blockades.len() {
75                    let blockade_name = handler.blockades[i].to_owned();
76                    match handler.execute_get_blockade(&blockade_name) {
77                        Ok(_val) => {}
78                        Err(_e) => {}
79                    }
80                }
81            }
82            Err(_e) => {}
83        }
84        return handler;
85    }
86
87    /// Returns all container names in default String order (lexicographical).
88    pub fn get_all_containers(&mut self, name: &str) -> Result<Vec<String>, BlockadeError> {
89        self.execute_get_blockade(name)?;
90        let mut all_containers: Vec<String> = if self.state.contains_key(name) {
91            self.state[name]
92                .containers
93                .keys()
94                .map(|val: &String| val.clone())
95                .collect()
96        } else {
97            Vec::new()
98        };
99        all_containers.sort();
100        return Ok(all_containers);
101    }
102
103    pub fn choose_random_container(&mut self, name: &str) -> Result<String, BlockadeError> {
104        if self.state.contains_key(name) && self.state[name].containers.keys().len() >= 1 {
105            let mut rng = thread_rng();
106            let state = self.state.clone();
107            let keys = state.get(name).unwrap().containers.keys();
108            let container = seq::sample_iter(&mut rng, keys, 1)
109                .unwrap()
110                .pop()
111                .unwrap()
112                .clone();
113            return Ok(container.into());
114        } else if !self.state.contains_key(name) {
115            return Err(BlockadeError::OtherError(String::from(
116                "Blockade not found in map",
117            )));
118        } else {
119            return Err(BlockadeError::OtherError(String::from(
120                "No containers to choose from",
121            )));
122        }
123    }
124
125    /// Start a blockade from a given name and config struct.
126    pub fn start_blockade(
127        &mut self,
128        name: &str,
129        config: BlockadeConfig,
130        restart: bool,
131    ) -> Result<(), BlockadeError> {
132        match self.execute_setup(name, config.clone()) {
133            Ok(_) => {}
134            Err(e) => {
135                if restart {
136                    match e {
137                        BlockadeError::ServerError(s) => {
138                            if s == String::from("Blockade name already exists") {
139                                self.destroy_blockade(name)?;
140                                self.execute_setup(name, config.clone())?;
141                                return Ok(());
142                            }
143                        }
144                        _ => {}
145                    }
146                }
147            }
148        };
149        self.execute_get_blockade(name)?;
150        return Ok(());
151    }
152
153    pub fn start_container(&mut self, name: &str, container: &str) -> Result<(), BlockadeError> {
154        self.execute_command(name, BlockadeCommand::Start, vec![container.into()])?;
155        self.execute_get_blockade(name)?;
156        return Ok(());
157    }
158
159    /// Stop a container by blockade name and container name.
160    pub fn stop_container(&mut self, name: &str, container: &str) -> Result<(), BlockadeError> {
161        self.execute_command(name, BlockadeCommand::Stop, vec![container.into()])?;
162        self.execute_get_blockade(name)?;
163        return Ok(());
164    }
165
166    /// Restart a container by blockade name and container name.
167    pub fn restart_container(&mut self, name: &str, container: &str) -> Result<(), BlockadeError> {
168        self.execute_command(name, BlockadeCommand::Restart, vec![container.into()])?;
169        self.execute_get_blockade(name)?;
170        return Ok(());
171    }
172
173    /// Restart a random-ish container.  Returns the name of the restarted container.
174    pub fn restart_one(&mut self, name: &str) -> Result<String, BlockadeError> {
175        let container = self.choose_random_container(name)?;
176        self.restart_container(name, &container)?;
177        return Ok(container);
178    }
179
180    /// Kills a container by blockade name and container name.
181    pub fn kill_container(&mut self, name: &str, container: &str) -> Result<(), BlockadeError> {
182        self.execute_command(name, BlockadeCommand::Kill, vec![container.into()])?;
183        self.execute_get_blockade(name)?;
184        return Ok(());
185    }
186
187    /// Kill a random-ish container.  Returns the name of the killed container.
188    pub fn kill_one(&mut self, name: &str) -> Result<String, BlockadeError> {
189        let container = self.choose_random_container(name)?;
190        self.kill_container(name, &container)?;
191        return Ok(container);
192    }
193
194    /// Makes partitions according to the given nested Vec<Vec<String>> of container names.
195    pub fn make_partitions(
196        &mut self,
197        name: &str,
198        partitions: Vec<Vec<String>>,
199    ) -> Result<(), BlockadeError> {
200        self.execute_partition(name, partitions)?;
201        self.execute_get_blockade(name)?;
202        return Ok(());
203    }
204
205    /// Puts all containers in one partition and restores the network QoS.
206    pub fn heal_partitions(&mut self, name: &str) -> Result<(), BlockadeError> {
207        self.execute_restore_network(name)?;
208        self.execute_get_blockade(name)?;
209        return Ok(());
210    }
211
212    /// Makes the network condition generally bad.  Introduces at least latency and dropped packets
213    /// potentially also causes reordering of some magnitude.
214    pub fn make_net_unreliable(&mut self, name: &str) -> Result<(), BlockadeError> {
215        let all_containers = self.get_all_containers(name)?;
216        self.execute_net_command(name, BlockadeNetStatus::Flaky, all_containers)?;
217        self.execute_get_blockade(name)?;
218        return Ok(());
219    }
220
221    /// Makes the network condition as good as can be given the host conditions.  Generally this
222    /// means near perfect since the containers are usually on the local machine and the OS is
223    /// reasonably good about pushing packets.
224    pub fn make_net_fast(&mut self, name: &str) -> Result<(), BlockadeError> {
225        let all_containers = self.get_all_containers(name)?;
226        self.execute_net_command(name, BlockadeNetStatus::Fast, all_containers)?;
227        self.execute_get_blockade(name)?;
228        return Ok(());
229    }
230
231    /// Shuts down the blockade and all of its containers.  Probably don't want to use this
232    /// blockade afterward, considering it's pretty final.
233    pub fn destroy_blockade(&mut self, name: &str) -> Result<(), BlockadeError> {
234        self.execute_get_blockade(name)?;
235        self.execute_delete_blockade(name)?;
236        return Ok(());
237    }
238
239    pub fn fetch_state(&mut self) -> Result<(), BlockadeError> {
240        self.execute_list_blockades()?;
241        let blockades = self.blockades.clone();
242        for blockade in blockades.iter() {
243            self.execute_get_blockade(&blockade)?;
244        }
245        return Ok(());
246    }
247
248    fn execute_setup(&mut self, name: &str, config: BlockadeConfig) -> Result<(), BlockadeError> {
249        self.config.insert(name.into(), config.clone());
250
251        let json = serde_json::to_string_pretty(&config).expect("Failed to serialize config");
252        trace!("Config: {}", json);
253
254        let mut res = self.client
255            .post(format!("{}/blockade/{}", self.host, name).as_str())
256            .json(&config)
257            .send()?;
258
259        debug!("Posted to server with status: {}", res.status());
260
261        if res.status().is_success() {
262            return Ok(());
263        } else {
264            return Err(BlockadeError::ServerError(res.text()?));
265        }
266    }
267
268    fn execute_command(
269        &mut self,
270        name: &str,
271        command: BlockadeCommand,
272        containers: Vec<String>,
273    ) -> Result<(), BlockadeError> {
274        let args = BlockadeCommandArgs {
275            command,
276            container_names: containers,
277        };
278
279        let mut res = self.client
280            .post(format!("{}/blockade/{}/action", self.host, name).as_str())
281            .json(&args)
282            .send()?;
283
284        debug!("Posted to server with status: {}", res.status());
285
286        if res.status().is_success() {
287            return Ok(());
288        } else {
289            return Err(BlockadeError::ServerError(res.text()?));
290        }
291    }
292
293    fn execute_net_command(
294        &mut self,
295        name: &str,
296        network_state: BlockadeNetStatus,
297        container_names: Vec<String>,
298    ) -> Result<(), BlockadeError> {
299        let args = BlockadeNetArgs {
300            network_state,
301            container_names: container_names,
302        };
303
304        let mut res = self.client
305            .post(format!("{}/blockade/{}/network_state", self.host, name).as_str())
306            .json(&args)
307            .send()?;
308
309        debug!("Posted to server with status: {}", res.status());
310
311        if res.status().is_success() {
312            return Ok(());
313        } else {
314            return Err(BlockadeError::ServerError(res.text()?));
315        }
316    }
317
318    fn execute_partition(
319        &mut self,
320        name: &str,
321        partitions: Vec<Vec<String>>,
322    ) -> Result<(), BlockadeError> {
323        let args = BlockadePartitionArgs { partitions };
324
325        let mut res = self.client
326            .post(format!("{}/blockade/{}/partitions", self.host, name).as_str())
327            .json(&args)
328            .send()?;
329
330        debug!("Posted to server with status: {}", res.status());
331
332        if res.status().is_success() {
333            return Ok(());
334        } else {
335            return Err(BlockadeError::ServerError(res.text()?));
336        }
337    }
338
339    fn execute_restore_network(&mut self, name: &str) -> Result<(), BlockadeError> {
340        let mut res = self.client
341            .delete(format!("{}/blockade/{}/partitions", self.host, name).as_str())
342            .send()?;
343
344        debug!("Sent delete to server with status: {}", res.status());
345
346        if res.status().is_success() {
347            return Ok(());
348        } else {
349            return Err(BlockadeError::ServerError(res.text()?));
350        }
351    }
352
353    fn execute_list_blockades(&mut self) -> Result<(), BlockadeError> {
354        let mut res = self.client
355            .get(format!("{}/blockade", self.host).as_str())
356            .send()?;
357
358        debug!("Sent get to server with status: {}", res.status());
359
360        if res.status().is_success() {
361            let raw_text = res.text()?;
362            debug!("Raw response from server: {:#?}", &raw_text);
363            let v: HashMap<String, Vec<String>> = serde_json::from_str(&raw_text)?;
364            self.blockades = match v.get("blockades") {
365                Some(n) => (n.clone()).into(),
366                None => Vec::new(),
367            };
368            return Ok(());
369        } else {
370            return Err(BlockadeError::ServerError(res.text()?));
371        }
372    }
373
374    fn execute_get_blockade(&mut self, name: &str) -> Result<(), BlockadeError> {
375        let mut res = self.client
376            .get(format!("{}/blockade/{}", self.host, name).as_str())
377            .send()?;
378
379        debug!("Sent get to server with status: {}", res.status());
380
381        if res.status().is_success() {
382            let raw_text = res.text()?;
383            debug!("Raw response from server: {:#?}", &raw_text);
384            let s: BlockadeState = serde_json::from_str(&raw_text)?;
385            self.state.insert(name.into(), s);
386            return Ok(());
387        } else {
388            return Err(BlockadeError::ServerError(res.text()?));
389        }
390    }
391
392    fn execute_delete_blockade(&mut self, name: &str) -> Result<(), BlockadeError> {
393        let mut res = self.client
394            .delete(format!("{}/blockade/{}", self.host, name).as_str())
395            .send()?;
396
397        debug!("Sent delete to server with status: {}", res.status());
398
399        if res.status().is_success() {
400            if self.state.contains_key(name) {
401                self.state.remove(name);
402            }
403            return Ok(());
404        } else {
405            return Err(BlockadeError::ServerError(res.text()?));
406        }
407    }
408}