use std::any::Any;
use std::collections::HashMap;
use std::convert::Infallible;
use std::fmt::Debug;
use std::future::ready;
use std::pin::Pin;
use futures::{sink, Sink, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::Stream;
use tracing::trace;
use crate::scheduled::graph::Hydroflow;
use crate::util::{collect_ready_async, unbounded_channel};
pub type Hostname = String;
type InterfaceName = String;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Address {
host: Hostname,
interface: InterfaceName,
}
impl Address {
pub fn new(host: Hostname, interface: InterfaceName) -> Self {
Address { host, interface }
}
}
pub trait MessageSender {
fn send(&self, message: MessageWithAddress);
}
impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
fn send(&self, message: (Box<dyn Any>, Address)) {
match message.0.downcast::<T>() {
Ok(msg) => {
self.send((*msg, message.1)).unwrap();
}
Err(e) => {
panic!("Failed to downcast message to expected type: {:?}", e);
}
}
}
}
pub type MessageWithAddress = (Box<dyn Any>, Address);
pub struct Inbox {
sender: Box<dyn MessageSender>,
}
pub struct Outbox {
receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
}
pub struct Host {
name: Hostname,
transducer: Hydroflow<'static>,
inputs: HashMap<InterfaceName, Inbox>,
output: HashMap<InterfaceName, Outbox>,
}
impl Host {
pub fn run_tick(&mut self) -> bool {
self.transducer.run_tick()
}
}
pub struct HostBuilder {
name: Hostname,
transducer: Option<Hydroflow<'static>>,
inboxes: HashMap<InterfaceName, Inbox>,
outboxes: HashMap<InterfaceName, Outbox>,
}
pub struct TransducerBuilderContext<'context> {
inboxes: &'context mut HashMap<InterfaceName, Inbox>,
outboxes: &'context mut HashMap<InterfaceName, Outbox>,
}
fn sink_from_fn<T>(mut f: impl FnMut(T)) -> impl Sink<T, Error = Infallible> {
sink::drain().with(move |item| {
(f)(item);
ready(Result::<(), Infallible>::Ok(()))
})
}
impl TransducerBuilderContext<'_> {
pub fn new_inbox<T: 'static>(
&mut self,
interface: InterfaceName,
) -> UnboundedReceiverStream<(T, Address)> {
let (sender, receiver) = unbounded_channel::<(T, Address)>();
self.inboxes.insert(
interface,
Inbox {
sender: Box::new(sender),
},
);
receiver
}
pub fn new_outbox<T: 'static>(
&mut self,
interface: InterfaceName,
) -> impl Sink<(T, Address), Error = Infallible> {
let (sender, receiver) = unbounded_channel::<(T, Address)>();
let receiver = receiver.map(|(msg, addr)| (Box::new(msg) as Box<dyn Any>, addr));
self.outboxes.insert(
interface,
Outbox {
receiver: Box::pin(receiver),
},
);
sink_from_fn(move |message: (T, Address)| sender.send((message.0, message.1)).unwrap())
}
}
impl HostBuilder {
pub fn new(name: Hostname) -> Self {
HostBuilder {
name,
transducer: None,
inboxes: Default::default(),
outboxes: Default::default(),
}
}
pub fn with_transducer<F>(mut self, builder: F) -> Self
where
F: FnOnce(&mut TransducerBuilderContext) -> Hydroflow<'static>,
{
let mut context = TransducerBuilderContext {
inboxes: &mut self.inboxes,
outboxes: &mut self.outboxes,
};
let transducer = builder(&mut context);
self.transducer = Some(transducer);
self
}
pub fn build(self) -> Host {
if self.transducer.is_none() {
panic!("Transducer is required to build a host");
}
Host {
name: self.name,
transducer: self.transducer.unwrap(),
inputs: self.inboxes,
output: self.outboxes,
}
}
}
pub struct Fleet {
hosts: HashMap<String, Host>,
}
impl Fleet {
pub fn new() -> Self {
Fleet {
hosts: HashMap::new(),
}
}
pub fn add_host<F>(&mut self, name: String, transducer_builder: F) -> &Host
where
F: FnOnce(&mut TransducerBuilderContext) -> Hydroflow<'static>,
{
let host = HostBuilder::new(name.clone())
.with_transducer(transducer_builder)
.build();
assert!(
self.hosts.insert(host.name.clone(), host).is_none(),
"Host with name {} already exists",
name
);
self.get_host(&name).unwrap()
}
pub fn get_host(&self, name: &str) -> Option<&Host> {
self.hosts.get(name)
}
pub fn get_host_mut(&mut self, name: &str) -> Option<&mut Host> {
self.hosts.get_mut(name)
}
pub async fn run_single_tick_all_hosts(&mut self) -> bool {
let mut work_done: bool = false;
for (name, host) in self.hosts.iter_mut() {
trace!("Running tick for host: {}", name);
work_done |= host.run_tick();
}
self.process_network().await;
work_done
}
pub async fn process_network(&mut self) {
let mut all_messages: Vec<(Address, MessageWithAddress)> = Vec::new();
for (name, host) in self.hosts.iter_mut() {
for (interface, output) in host.output.iter_mut() {
let src_address = Address::new(name.clone(), interface.clone());
let all_messages_on_interface: Vec<_> =
collect_ready_async(&mut output.receiver).await;
for message_on_interface in all_messages_on_interface {
all_messages.push((src_address.clone(), message_on_interface));
}
}
}
for (src_address, (msg, addr)) in all_messages {
if let Some(destination_host) = self.hosts.get(&addr.host) {
if let Some(input) = destination_host.inputs.get(&addr.interface) {
input.sender.send((msg, src_address.clone()));
} else {
trace!(
"No interface named {:?} found on host {:?}. Dropping message {:?}.",
addr.interface,
addr.host,
msg
);
}
} else {
trace!(
"No host named {:?} found. Dropping message {:?}.",
addr.host,
msg
);
}
}
}
pub async fn run_until_quiescent(&mut self) {
while self.run_single_tick_all_hosts().await {}
}
}
impl Default for Fleet {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use hydroflow_macro::{hydroflow_syntax, hydroflow_test};
use crate::util::simulation::{Address, Fleet, Hostname};
use crate::util::unbounded_channel;
#[hydroflow_test]
async fn test_echo() {
let mut fleet = Fleet::new();
let server: Hostname = "server".to_string();
let client: Hostname = "client".to_string();
let interface: String = "echo".to_string();
let server_address = Address::new(server.clone(), interface.clone());
fleet.add_host(server.clone(), |ctx| {
let network_input = ctx.new_inbox::<String>(interface.clone());
let network_output = ctx.new_outbox::<String>(interface.clone());
hydroflow_syntax! {
out = dest_sink(network_output);
source_stream(network_input)
-> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
-> out;
}
});
let (client_trigger_tx, client_trigger_rx) = unbounded_channel::<String>();
let (client_response_tx, mut client_response_rx) = unbounded_channel::<String>();
fleet.add_host(client.clone(), |ctx| {
let network_out = ctx.new_outbox::<String>(interface.clone());
let network_in = ctx.new_inbox::<String>(interface.clone());
hydroflow_syntax! {
out = dest_sink(network_out);
source_stream(client_trigger_rx)
-> map(|msg| (msg, server_address.clone()))
-> out;
source_stream(network_in)
-> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
-> for_each(|(msg, _addr)| client_response_tx.send(msg).unwrap());
}
});
client_trigger_tx.send("Hello, world!".to_string()).unwrap();
fleet.run_until_quiescent().await;
let response = client_response_rx.next().await.unwrap();
assert_eq!(response, "Hello, world!");
}
}