1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use crate::{CommitStore, Entity, EntityId, EntityName, Query, CQRS, ES};
use futures::channel::oneshot::{channel, Sender as ChannelSender};
use riker::actors::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

pub struct Manager {
    sys: ActorSystem,
    entities: HashMap<String, BasicActorRef>,
}

impl Manager {
    pub fn new(sys: ActorSystem) -> Self {
        Manager {
            sys,
            entities: HashMap::new(),
        }
    }

    pub fn sys(&self) -> &ActorSystem {
        &self.sys
    }

    pub fn register<E, S>(mut self, store: S, args: E::Args) -> Self
    where
        E: ES,
        S: CommitStore<E::Model>,
    {
        let entity = self
            .sys
            .actor_of_args::<Entity<E, S>, _>(E::NAME, (store, args))
            .expect(&format!("create entity {}", E::NAME));
        self.entities.insert(E::NAME.into(), entity.into());
        self
    }

    pub async fn command<C>(&self, cmd: C) -> EntityId
    where
        C: Message + EntityName,
    {
        let entity = self.entity(<C as EntityName>::NAME);
        self.ask(entity, CQRS::Cmd(cmd)).await
    }

    pub async fn query<E>(&self, id: EntityId) -> Option<E::Model>
    where
        E: ES + EntityName,
    {
        let entity = self.entity(<E as EntityName>::NAME);
        let q: CQRS<E::Cmd> = CQRS::Query(Query::One(id.into()));
        self.ask(entity, q).await
    }

    pub fn entity(&self, name: &str) -> BasicActorRef {
        self.entities.get(name).unwrap().clone()
    }

    async fn ask<Msg: Message, R: Message>(&self, entity: BasicActorRef, msg: Msg) -> R {
        let (tx, rx) = channel::<R>();
        let tx = Arc::new(Mutex::new(Some(tx)));
        let tmp_sender = self.sys.tmp_actor_of_args::<AskActor<R>, _>(tx).unwrap();

        entity.try_tell(msg, tmp_sender).expect("can send message");
        rx.await.unwrap()
    }
}

struct AskActor<Msg> {
    tx: Arc<Mutex<Option<ChannelSender<Msg>>>>,
}

impl<Msg: Message> ActorFactoryArgs<Arc<Mutex<Option<ChannelSender<Msg>>>>> for AskActor<Msg> {
    fn create_args(tx: Arc<Mutex<Option<ChannelSender<Msg>>>>) -> Self {
        AskActor { tx }
    }
}

impl<Msg: Message> Actor for AskActor<Msg> {
    type Msg = Msg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, _: Sender) {
        if let Ok(mut tx) = self.tx.lock() {
            tx.take().unwrap().send(msg).unwrap();
        }
        ctx.stop(&ctx.myself);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{macros::*, Event, MemStore, Model};
    use async_trait::async_trait;
    use futures::executor::block_on;

    #[derive(EntityName, Debug)]
    struct Entity1;
    #[derive(Debug, Clone)]
    struct Model1;
    impl Model for Model1 {
        type Change = ();
        fn id(&self) -> EntityId {
            "dummy".into()
        }
        fn apply_change(&mut self, _change: &Self::Change) {}
    }
    impl EntityName for () {
        const NAME: &'static str = "Entity1";
    }
    #[async_trait]
    impl ES for Entity1 {
        type Args = ();
        type Model = Model1;
        type Cmd = ();
        type Error = ();
        fn new(_cx: &Context<CQRS<Self::Cmd>>, _args: Self::Args) -> Self {
            Entity1
        }
        async fn handle_command(
            &mut self,
            _cmd: Self::Cmd,
        ) -> Result<crate::Commit<Self::Model>, Self::Error> {
            Ok(Event::Create(Model1).into())
        }
    }

    #[test]
    fn register_entities() {
        let sys = ActorSystem::new().unwrap();
        let mgr = Manager::new(sys).register::<Entity1, _>(MemStore::new(), ());
        let id = block_on(mgr.command(()));
        assert_eq!(id, "dummy".into());
    }
}