antimony 0.0.1

A topology builder crate for Antimony.
Documentation
use futures::{Future, future};
use tokio_uds::UnixStream;
use tokio_core::reactor::Core;
use std::thread;
use std::sync::{Arc, Mutex};
use components::{Message, ComponentConfig};

pub trait BaseSpout{
    fn prepare(&mut self);
    fn next_tuple(&mut self);
}

#[derive(Clone)]
pub struct Spout{
    name: String,
    out_q: Arc<Mutex<Vec<Message>>>
}

impl Spout{
    pub fn new(name: &str) -> Self{
        let out_q = Arc::new(Mutex::new(vec![]));
        Spout{
            name: name.to_string(),
            out_q: out_q
        }
    }

    pub fn emit(&mut self, tuple: Message){
        let mut q = self.out_q.lock().unwrap();
        q.push(tuple);
    }

    pub fn start<T>(&mut self, mut spout: T, args: ComponentConfig) where T: BaseSpout{
        spout.prepare();
        let q_clone = self.out_q.clone();
        let sock_file = args.sock_file.clone();
        thread::spawn(move || {
            let mut core = Core::new().unwrap();
            let handle = core.handle();
            let socket = UnixStream::connect(sock_file, &handle).unwrap();
            let reg = Message::Local(args.component_id);
            let socket = core.run(reg.to_uds(socket)).unwrap();
            let lo = future::loop_fn::<_, UnixStream, _, _>(socket, |cc|{
                let next;
                {
                    let mut q = q_clone.lock().unwrap();
                    if q.len() == 0{
                        return future::ok(()).and_then(|_| Ok(future::Loop::Continue(cc))).boxed();
                    }
                    next = (*q).remove(0);
                }
                Box::new(next.to_uds(cc)
                            .and_then(|c| Ok(future::Loop::Continue(c))))
            });
            let _ = core.run(lo);
        });
        loop{
            spout.next_tuple();
        }
    }
}