use std::sync::{Arc,Mutex};
use std::thread;
use std::sync::mpsc;
use slab::*;
use super::*;
#[derive(Clone)]
pub struct LocalDirect {
shared: Arc<Mutex<Internal>>,
}
struct Internal {
tx_threads: Vec<thread::JoinHandle<()>>
}
impl LocalDirect {
pub fn new () -> Self{
LocalDirect {
shared: Arc::new(Mutex::new(
Internal {
tx_threads: Vec::new()
}
))
}
}
}
impl Transport for LocalDirect {
fn is_local (&self) -> bool {
true
}
fn make_transmitter (&self, args: &TransmitterArgs ) -> Option<Transmitter> {
if let &TransmitterArgs::Local(rcv_slab) = args {
let slab = rcv_slab.weak();
let (tx_channel, rx_channel) = mpsc::channel::<(SlabRef,MemoRef)>();
let tx_thread : thread::JoinHandle<()> = thread::spawn(move || {
while let Ok((from_slabref, memoref)) = rx_channel.recv() {
if let Some(slab) = slab.upgrade(){
let owned_slabref = from_slabref.clone_for_slab(&slab);
memoref.clone_for_slab(&owned_slabref, &slab, true);
}
}
});
self.shared.lock().unwrap().tx_threads.push(tx_thread);
Some(Transmitter::new_local(args.get_slab_id(), Mutex::new(tx_channel)))
}else{
None
}
}
fn bind_network(&self, _net: &Network) {}
fn unbind_network(&self, _net: &Network) {}
fn get_return_address ( &self, address: &TransportAddress ) -> Option<TransportAddress> {
if let TransportAddress::Local = *address {
Some(TransportAddress::Local)
}else{
None
}
}
}
impl Drop for Internal {
fn drop (&mut self) {
}
}