use std::collections::HashMap;
use std::sync::Arc;
use serde_json;
use std::marker::PhantomData;
use crate::core::{ChoreographyLocation, HCons, LocationSet, Portable, Transport};
use crate::utils::queue::BlockingQueue;
type QueueMap = HashMap<String, HashMap<String, BlockingQueue<String>>>;
pub struct LocalTransportChannel<L: LocationSet> {
location_set: std::marker::PhantomData<L>,
queue_map: Arc<QueueMap>,
}
impl<L: LocationSet> Clone for LocalTransportChannel<L> {
fn clone(&self) -> Self {
LocalTransportChannel {
location_set: PhantomData,
queue_map: self.queue_map.clone(),
}
}
}
impl<L: LocationSet> LocalTransportChannel<L> {
pub fn new() -> LocalTransportChannel<L> {
let mut queue_map: QueueMap = HashMap::new();
let str_list = L::to_string_list();
for sender in &str_list {
let mut n = HashMap::new();
for receiver in &str_list {
n.insert(receiver.to_string(), BlockingQueue::new());
}
queue_map.insert(sender.to_string(), n);
}
LocalTransportChannel {
location_set: PhantomData,
queue_map: queue_map.into(),
}
}
}
pub struct LocalTransportChannelBuilder<L: LocationSet> {
location_set: PhantomData<L>,
}
impl LocalTransportChannelBuilder<LocationSet!()> {
pub fn new() -> Self {
Self {
location_set: PhantomData,
}
}
}
impl<L: LocationSet> LocalTransportChannelBuilder<L> {
pub fn with<NewLocation: ChoreographyLocation>(
&self,
location: NewLocation,
) -> LocalTransportChannelBuilder<HCons<NewLocation, L>> {
_ = location;
LocalTransportChannelBuilder {
location_set: PhantomData,
}
}
pub fn build(&self) -> LocalTransportChannel<L> {
LocalTransportChannel::new()
}
}
pub struct LocalTransport<L: LocationSet, TargetLocation> {
internal_locations: Vec<&'static str>,
location_set: PhantomData<L>,
local_channel: LocalTransportChannel<L>,
target_location: PhantomData<TargetLocation>,
}
impl<L: LocationSet, TargetLocation> LocalTransport<L, TargetLocation> {
pub fn new(target: TargetLocation, local_channel: LocalTransportChannel<L>) -> Self {
_ = target;
LocalTransport {
internal_locations: L::to_string_list(),
location_set: PhantomData,
local_channel,
target_location: PhantomData,
}
}
}
impl<L: LocationSet, TargetLocation: ChoreographyLocation> Transport<L, TargetLocation>
for LocalTransport<L, TargetLocation>
{
fn locations(&self) -> Vec<&'static str> {
return self.internal_locations.clone();
}
fn send<T: Portable>(&self, from: &str, to: &str, data: &T) -> () {
let data = serde_json::to_string(data).unwrap();
self.local_channel
.queue_map
.get(from)
.unwrap()
.get(to)
.unwrap()
.push(data)
}
fn receive<T: Portable>(&self, from: &str, at: &str) -> T {
let data = self
.local_channel
.queue_map
.get(from)
.unwrap()
.get(at)
.unwrap()
.pop();
serde_json::from_str(&data).unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::ChoreographyLocation;
use std::thread;
#[derive(ChoreographyLocation)]
struct Alice;
#[derive(ChoreographyLocation)]
struct Bob;
#[test]
fn test_local_transport() {
let v = 42;
let transport_channel = LocalTransportChannelBuilder::new()
.with(Alice)
.with(Bob)
.build();
let mut handles = Vec::new();
{
let transport = LocalTransport::new(Alice, transport_channel.clone());
handles.push(thread::spawn(move || {
transport.send::<i32>(Alice::name(), Bob::name(), &v);
}));
}
{
let transport = LocalTransport::new(Bob, transport_channel.clone());
handles.push(thread::spawn(move || {
let v2 = transport.receive::<i32>(Alice::name(), Bob::name());
assert_eq!(v, v2);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
}