1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
// cooper/src/actor.rs // // Copyright (c) 2021, Frank Pagliughi <fpagliughi@mindspring.com> // All Rights Reserved // // Licensed under the MIT license: // <LICENSE or http://opensource.org/licenses/MIT> // This file may not be copied, modified, or distributed except according // to those terms. // //! cooper use std::fmt::Debug; use futures::future::BoxFuture; use smol::{ channel::{ self, Sender, Receiver, }, }; /// Message type for the Actor. /// This wraps an async function type that takes a mutable reference to a /// state object. Implementations of actor objects can queue functions and /// closures to process the state. /// `S` is the internal state type for the actor to manage struct Message<S> { func: Box<dyn for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()> + Send>, } /// The Actor. /// /// This is an async command processor that serializes requests around an /// internal state. Each request runs to completion, atomically, in the /// order received, and thus tasks do not need to lock or protect the state /// for access. #[derive(Clone)] pub struct Actor<S> where S: Send + 'static { /// The channel to send requests to the actor's processor task. tx: Sender<Message<S>>, } impl<S> Actor<S> where S: Default + Send + 'static { /// Create a new actor with a default state pub fn new() -> Self { Self::from_state(S::default()) } } impl<S> Actor<S> where S: Send + 'static { /// Creates a new actor from an initial state pub fn from_state(state: S) -> Self { let (tx, rx) = channel::unbounded(); // TODO: Stash the handle somewhere? // Perhaps make a registry of running actors? smol::spawn(async move { Self::run(state, rx).await }).detach(); Self { tx } } /// The actor's command processor. /// /// This runs each request for the actor to completion before /// running the next one. async fn run(mut state: S, rx: Receiver<Message<S>>) { while let Ok(msg) = rx.recv().await { (msg.func)(&mut state).await; } } /// This is a totally asynchronous opertion. Awaiting the returned /// future only waits for the operation to be placed in the queue. /// It does not wait for the operation to be executed. pub async fn cast<F>(&self, f: F) where F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()>, F: 'static + Send, { let msg = Message { func: Box::new(move |state| Box::pin(async move { f(state).await; })) }; // TODO: Should we at least log the error? let _ = self.tx.send(msg).await; } /// A call is a synchronous opertion within the async task. /// It will queue the request, wait for it to execute, and /// return the result. pub async fn call<F, R>(&self, f: F) -> R where F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, R>, F: 'static + Send, R: 'static + Send + Debug, { let (tx, rx) = channel::bounded(1); let msg = Message { func: Box::new(move |state| Box::pin(async move { let res = f(state).await; let _ = tx.send(res).await; })) }; let _ = self.tx.send(msg).await; // TODO: Return an error instead of panicking rx.recv().await .expect("Actor is gone") } /// Blocks the calling task until all requests up to this point have /// been processed. /// /// Note that if there are clones of the actor, additional requests /// may get queued after this one, so the queue is not guaranteed to be /// empty when this returns; just that all the requests prior to this one /// have completed. pub async fn flush(&self) { self.call(|_| Box::pin(async move {})).await } }