1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// cooper/src/actor.rs
//
// Copyright (c) 2021, Frank Pagliughi <fpagliughi@mindspring.com>
// All Rights Reserved
//
// Licensed under the MIT license:
//   <LICENSE or http://opensource.org/licenses/MIT>
// This file may not be copied, modified, or distributed except according
// to those terms.
//
//! cooper

use std::fmt::Debug;
use futures::future::BoxFuture;
use smol::{
    channel::{
        self,
        Sender,
        Receiver,
    },
};

/// Message type for the Actor.
/// This wraps an async function type that takes a mutable reference to a
/// state object. Implementations of actor objects can queue functions and
/// closures to process the state.
/// `S` is the internal state type for the actor to manage
struct Message<S> {
    func: Box<dyn for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()> + Send>,
}

/// The Actor.
///
/// This is an async command processor that serializes requests around an
/// internal state. Each request runs to completion, atomically, in the
/// order received, and thus tasks do not need to lock or protect the state
/// for access.
#[derive(Clone)]
pub struct Actor<S>
where
    S: Send + 'static
{
    /// The channel to send requests to the actor's processor task.
    tx: Sender<Message<S>>,
}

impl<S> Actor<S>
where
    S: Default + Send + 'static
{
    /// Create a new actor with a default state
    pub fn new() -> Self {
        Self::from_state(S::default())
    }
}

impl<S> Actor<S>
where
    S: Send + 'static
{
    /// Creates a new actor from an initial state
    pub fn from_state(state: S) -> Self {
        let (tx, rx) = channel::unbounded();

        // TODO: Stash the handle somewhere?
        //  Perhaps make a registry of running actors?
        smol::spawn(async move {
            Self::run(state, rx).await
        }).detach();

        Self { tx }
    }

    /// The actor's command processor.
    ///
    /// This runs each request for the actor to completion before
    /// running the next one.
    async fn run(mut state: S, rx: Receiver<Message<S>>) {
        while let Ok(msg) = rx.recv().await {
            (msg.func)(&mut state).await;
        }
    }

    /// This is a totally asynchronous opertion. Awaiting the returned
    /// future only waits for the operation to be placed in the queue.
    /// It does not wait for the operation to be executed.
    pub async fn cast<F>(&self, f: F)
    where
        F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()>,
        F: 'static + Send,
    {
        let msg = Message {
            func: Box::new(move |state| Box::pin(async move {
                f(state).await;
            }))
        };

        // TODO: Should we at least log the error?
        let _ = self.tx.send(msg).await;
    }

    /// A call is a synchronous opertion within the async task.
    /// It will queue the request, wait for it to execute, and
    /// return the result.
    pub async fn call<F, R>(&self, f: F) -> R
    where
        F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, R>,
        F: 'static + Send,
        R: 'static + Send + Debug,
    {
        let (tx, rx) = channel::bounded(1);
        let msg = Message {
            func: Box::new(move |state| Box::pin(async move {
                let res = f(state).await;
                let _ = tx.send(res).await;
            }))
        };

        let _ = self.tx.send(msg).await;
        // TODO: Return an error instead of panicking
        rx.recv().await
            .expect("Actor is gone")
    }

    /// Blocks the calling task until all requests up to this point have
    /// been processed.
    ///
    /// Note that if there are clones of the actor, additional requests
    /// may get queued after this one, so the queue is not guaranteed to be
    /// empty when this returns; just that all the requests prior to this one
    /// have completed.
    pub async fn flush(&self) {
        self.call(|_| Box::pin(async move {})).await
    }
}