asim 0.1.0

Discrete Event Simluation for Async Rust
Documentation
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

use crate::sync::mpsc;
use crate::{TaskRunner, Timer};

use crate::network::{get_size_delay, Bandwidth, Latency, Link, NetworkMessage};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProcessId(u32);

impl ProcessId {
    fn random() -> Self {
        Self(rand::random())
    }
}

impl std::fmt::Display for ProcessId {
    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
        write!(fmt, "#{:x}", self.0)
    }
}

pub type NotifyDeliveryFn = Box<dyn FnOnce()>;

#[ async_trait::async_trait(?Send) ]
pub trait ProcessLogic<Message: NetworkMessage>: Any {
    fn start(&self, process: &Process<Message>);
    fn stop(&self, process: &Process<Message>);

    async fn handle_message(&self, process: &Process<Message>, source: ProcessId, message: Message);

    fn handle_disconnect(&self, _process: &Process<Message>, _peer: ProcessId) {}
}

pub struct Process<Message: NetworkMessage> {
    identifier: ProcessId,
    inbox_sender: mpsc::Sender<(ProcessId, Message, NotifyDeliveryFn)>,
    bandwidth: Bandwidth,
    task_runner: Rc<TaskRunner>,
    timer: Rc<Timer>,
    logic: Box<dyn ProcessLogic<Message>>,
    network_links: RefCell<HashMap<ProcessId, Rc<Link<Message>>>>,
}

impl<Message: NetworkMessage> Process<Message> {
    pub fn new(
        bandwidth: Bandwidth,
        logic: Box<dyn ProcessLogic<Message>>,
        task_runner: Rc<TaskRunner>,
        timer: Rc<Timer>,
    ) -> Rc<Self> {
        let (inbox_sender, inbox_receiver) = mpsc::channel();

        let obj = Rc::new(Self {
            identifier: ProcessId::random(),
            bandwidth,
            inbox_sender,
            timer,
            logic,
            task_runner,
            network_links: RefCell::new(HashMap::default()),
        });

        obj.logic.start(&obj);

        {
            let obj2 = obj.clone();
            obj.task_runner.spawn(async move {
                Self::inbox_loop(obj2, inbox_receiver).await;
            });
        }

        obj
    }

    pub fn get_identifier(&self) -> ProcessId {
        self.identifier
    }

    pub fn stop(&self) {
        self.logic.stop(self);
    }

    pub fn disconnect_all(&self) {
        let mut links = self.network_links.borrow_mut();

        for (peer_id, link) in links.iter() {
            log::trace!("Disconnecting process {} and {}", self.identifier, peer_id);

            let (proc1, proc2) = link.get_processes();

            let proc = if proc1.get_identifier() == *peer_id {
                proc1
            } else if proc2.get_identifier() == *peer_id {
                proc2
            } else {
                panic!("Invalid state");
            };

            proc.network_links
                .borrow_mut()
                .remove(&self.identifier)
                .expect("Connection did not exist");
            proc.logic.handle_disconnect(&proc, self.identifier);
            self.logic.handle_disconnect(self, *peer_id);
        }

        links.clear();
    }

    pub fn connect(
        task_runner: Rc<TaskRunner>,
        process1: &Rc<Self>,
        process2: &Rc<Self>,
        link_latency: Latency,
    ) {
        log::trace!(
            "Connecting process {} and {}",
            process1.get_identifier(),
            process2.get_identifier()
        );

        let link = Rc::new(Link::new(
            link_latency,
            Rc::downgrade(process1),
            Rc::downgrade(process2),
            task_runner,
        ));

        process1
            .network_links
            .borrow_mut()
            .insert(process2.get_identifier(), link.clone());
        process2
            .network_links
            .borrow_mut()
            .insert(process1.get_identifier(), link);
    }

    pub(super) fn deliver_message(
        &self,
        source: ProcessId,
        message: Message,
        notify_delivery_fn: NotifyDeliveryFn,
    ) {
        self.inbox_sender
            .send((source, message, notify_delivery_fn));
    }

    async fn inbox_loop(
        self_ptr: Rc<Self>,
        inbox_receiver: mpsc::Receiver<(ProcessId, Message, NotifyDeliveryFn)>,
    ) {
        loop {
            for (source, message, notify_delivery_fn) in inbox_receiver.recv().await.drain(..) {
                let size = message.get_size();
                let size_delay = get_size_delay(size, self_ptr.bandwidth);

                if !size_delay.is_zero() {
                    self_ptr.timer.sleep_for(size_delay).await;
                }

                notify_delivery_fn();

                let self_ptr2 = self_ptr.clone();
                self_ptr.task_runner.spawn(async move {
                    self_ptr2
                        .logic
                        .handle_message(&*self_ptr2, source, message)
                        .await;
                });
            }
        }
    }

    pub fn get_link_to(&self, process_id: &ProcessId) -> Option<Rc<Link<Message>>> {
        match self.network_links.borrow().get(process_id) {
            Some(link) => Some(link.clone()),
            None => {
                log::warn!(
                    "There exists no network link from process {} to {process_id}",
                    self.identifier
                );
                None
            }
        }
    }

    pub fn send_to(&self, process_id: &ProcessId, message: Message) -> bool {
        if let Some(link) = self.get_link_to(process_id) {
            Link::send(&link, self.identifier, message);
            true
        } else {
            false
        }
    }

    pub fn get_logic_as<T: ProcessLogic<Message>>(&self) -> &'_ T {
        let logic_ref = &self.logic as &dyn Any;

        logic_ref
            .downcast_ref::<T>()
            .expect("Logic was not of the expected type")
    }
}