antimony 0.0.1

A topology builder crate for Antimony.
Documentation
use futures::{Future, future};
use tokio_uds::UnixStream;
use tokio_io::AsyncRead;
use tokio_core::reactor::{Core, Timeout};
use std::thread;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use components::{Message, ComponentConfig};

pub trait BaseBolt{
    fn prepare(&mut self);
    fn process_tuple(&mut self, tuple: Message);
}

#[derive(Clone)]
pub struct Bolt{
    name: String,
    in_q: Arc<Mutex<Vec<Message>>>,
    out_q: Arc<Mutex<Vec<Message>>>
}

impl Bolt{
    pub fn new(name: &str) -> Self{
        let in_q = Arc::new(Mutex::new(vec![]));
        let out_q = Arc::new(Mutex::new(vec![]));
        Bolt{
            name: name.to_string(),
            in_q: in_q,
            out_q: out_q
        }
    }

    pub fn emit(&mut self, tuple: Message){
        let mut q = self.out_q.lock().unwrap();
        q.push(tuple);
    }

    pub fn start<T>(&mut self, mut bolt: T, args: ComponentConfig) where T: BaseBolt{
        bolt.prepare();
        let in_q_clone = self.in_q.clone();
        let out_q_clone = self.out_q.clone();
        let sock_file = args.sock_file.clone();
        thread::spawn(move || {
            let mut core = Core::new().unwrap();
            let handle = core.handle();
            let socket = UnixStream::connect(sock_file, &handle).unwrap();
            let reg = Message::Local(args.component_id);
            let socket = core.run(reg.to_uds(socket)).unwrap();
            let (rx, tx) = socket.split();
            let write = future::loop_fn::<_, UnixStream, _, _>(tx, |cc|{
                let next;
                {
                    let mut q = out_q_clone.lock().unwrap();
                    if q.len() == 0{
                        return Timeout::new(Duration::from_millis(100), &handle).unwrap()
                                .and_then(|_| Ok(future::Loop::Continue(cc))).boxed();
                    }
                    next = (*q).remove(0);
                }
                Box::new(next.to_half_uds(cc)
                            .and_then(|c| Ok(future::Loop::Continue(c))))
            });
            let read = future::loop_fn::<_, UnixStream, _, _>(rx, move |cc|{
                let in_q = in_q_clone.clone();
                Box::new(Message::from_half_uds(cc).and_then(move |m| {
                    {                    
                        let mut q = in_q.lock().unwrap();
                        q.push(m.0);
                    }
                    Ok(future::Loop::Continue(m.1))
                }))
            }).and_then(|_| Ok(()));
            let _ = core.run(write.join(read));
        });
        loop{
            let tuple;
            {
                let mut q = self.in_q.lock().unwrap();
                if q.len() == 0{
                    continue;
                }
                tuple = (*q).remove(0);
            }
            bolt.process_tuple(tuple);
        }
    }
}