ak 0.1.1

Actor library utilizing std futures
Documentation
use crate::prelude::*;
use crate::actor::Actor;
use crate::addr::{Addr, Envelope, Message, BoxEnvelope};

use std::{
    collections::LinkedList,
    cell::{RefCell, UnsafeCell},
    sync::Arc,
    task,
    mem,
    ptr::NonNull,
};
use futures::{
    select,
    Stream,
    channel::mpsc::{Sender, Receiver},
    SinkExt,
    StreamExt,
    stream::FuturesUnordered
};
use rt::Runtime;




pub type ActorItem<A> = LocalBoxFuture<'static, Option<BoxEnvelope<A>>>;


pub struct Context<A: Actor> {
    stack: usize,
    sender: Sender<BoxEnvelope<A>>,
    mailbox: Receiver<BoxEnvelope<A>>,
    items: FuturesUnordered<ActorItem<A>>,
    running: bool,
    actor: A,
}

impl<A: Actor> Deref for Context<A> {
    type Target = A;
    fn deref(&self) -> &A {
        &self.actor
    }
}

impl<A: Actor> DerefMut for Context<A> {
    fn deref_mut(&mut self) -> &mut A {
        &mut self.actor
    }
}


pub async fn dispatch<M: Message, A: Actor + crate::actor::Handler<M>>(ctx: &mut Context<A>, msg: M, mut tx: oneshot::Sender<M::Result>)
{
    use crate::actor::Handler;
    let mut ctx_ref = ContextRef::from_ctx_ref(ctx);
    let fut = ctx_ref.handle(msg);

    ctx.items.push(async {
        tx.send(fut.await);
        None
    }.boxed_local());
}


pub struct ContextRef<A: Actor> {
    data: *mut Context<A>,
}

impl<A: Actor> ContextRef<A> {
    pub(crate) fn from_ctx_ref(ctx: &mut Context<A>) -> Self {
        ContextRef {
            data: ctx as *mut _
        }
    }
}

impl<A: Actor> Deref for ContextRef<A> {
    type Target = Context<A>;
    fn deref(&self) -> &Context<A> {
        unsafe { self.data.as_ref().unwrap() }
    }
}

impl<A: Actor> DerefMut for ContextRef<A> {
    fn deref_mut(&mut self) -> &mut Context<A> {
        unsafe { self.data.as_mut().unwrap() }
    }
}


impl<A: Actor> Context<A>
{
    async fn into_future(mut self) {
        loop {
            if !self.running {
                return;
            }
            let mut next_msg = (&mut self.mailbox).next();
            pin_utils::pin_mut!(next_msg);

            let mut next_item = (&mut self.items).next();
            pin_utils::pin_mut!(next_item);

            match select(next_msg, next_item).await {
                Either::Left((msg, _)) => {
                    if let Some(msg) = msg {
                        println!("Opening new message");
                        msg.open(&mut self).await;
                    } else {
                        println!("Mailbox has closed");
                        return;
                    }
                }
                Either::Right((item, _)) => {
                    if let Some(env_opt) = item {
                        println!("Internal future completed");
                        if let Some(envelope) = env_opt {
                            envelope.open(&mut self).await;
                        }
                    } else {
                        println!("Internal future none");
                    }
                }
            }
        }
    }


    pub fn sender(&self) -> Sender<BoxEnvelope<A>> {
        self.sender.clone()
    }


    pub fn start<F: FnOnce() -> A + Send + 'static>(create: F) -> Addr<A> {
        let (tx, rx) = futures::channel::mpsc::channel(100);

        let sender = tx.clone();


        std::thread::spawn(move || {
            let mut items = FuturesUnordered::new();
            items.push(pending().boxed_local());

            let mut context = Context {
                stack: 0,
                actor: create(),
                sender,
                mailbox: rx,
                items,
                running: true,
            };

            /*
                        let mut rt = tokio::runtime::current_thread::Builder::new().build().unwrap();

                        rt.spawn(context.into_future());
                        rt.run().unwrap();
                        */
        });

        return Addr { sender: tx };
    }

    pub fn stop(&mut self) {
        self.running = false;
    }

    pub async fn suspend<F>(mut self: ContextRef<A>, f: F) -> (ContextRef<A>, F::Output)
        where F: Future + 'static,

    {
        (self, f.await)
    }
}