1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
use crate::{CommitStore, Entity, EntityId, EntityName, Query, CQRS, ES}; use futures::channel::oneshot::{channel, Sender as ChannelSender}; use riker::actors::*; use std::collections::HashMap; use std::sync::{Arc, Mutex}; pub struct Manager { sys: ActorSystem, entities: HashMap<String, BasicActorRef>, } impl Manager { pub fn new(sys: ActorSystem) -> Self { Manager { sys, entities: HashMap::new(), } } pub fn sys(&self) -> &ActorSystem { &self.sys } pub fn register<E, S>(mut self, store: S, args: E::Args) -> Self where E: ES, S: CommitStore<E::Model>, { let entity = self .sys .actor_of_args::<Entity<E, S>, _>(E::NAME, (store, args)) .expect(&format!("create entity {}", E::NAME)); self.entities.insert(E::NAME.into(), entity.into()); self } pub async fn command<C>(&self, cmd: C) -> EntityId where C: Message + EntityName, { let entity = self.entity(<C as EntityName>::NAME); self.ask(entity, CQRS::Cmd(cmd)).await } pub async fn query<E>(&self, id: EntityId) -> Option<E::Model> where E: ES + EntityName, { let entity = self.entity(<E as EntityName>::NAME); let q: CQRS<E::Cmd> = CQRS::Query(Query::One(id.into())); self.ask(entity, q).await } pub fn entity(&self, name: &str) -> BasicActorRef { self.entities.get(name).unwrap().clone() } async fn ask<Msg: Message, R: Message>(&self, entity: BasicActorRef, msg: Msg) -> R { let (tx, rx) = channel::<R>(); let tx = Arc::new(Mutex::new(Some(tx))); let tmp_sender = self.sys.tmp_actor_of_args::<AskActor<R>, _>(tx).unwrap(); entity.try_tell(msg, tmp_sender).expect("can send message"); rx.await.unwrap() } } struct AskActor<Msg> { tx: Arc<Mutex<Option<ChannelSender<Msg>>>>, } impl<Msg: Message> ActorFactoryArgs<Arc<Mutex<Option<ChannelSender<Msg>>>>> for AskActor<Msg> { fn create_args(tx: Arc<Mutex<Option<ChannelSender<Msg>>>>) -> Self { AskActor { tx } } } impl<Msg: Message> Actor for AskActor<Msg> { type Msg = Msg; fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, _: Sender) { if let Ok(mut tx) = self.tx.lock() { tx.take().unwrap().send(msg).unwrap(); } ctx.stop(&ctx.myself); } } #[cfg(test)] mod tests { use super::*; use crate::{macros::*, Event, MemStore, Model}; use async_trait::async_trait; use futures::executor::block_on; #[derive(EntityName, Debug)] struct Entity1; #[derive(Debug, Clone)] struct Model1; impl Model for Model1 { type Change = (); fn id(&self) -> EntityId { "dummy".into() } fn apply_change(&mut self, _change: &Self::Change) {} } impl EntityName for () { const NAME: &'static str = "Entity1"; } #[async_trait] impl ES for Entity1 { type Args = (); type Model = Model1; type Cmd = (); type Error = (); fn new(_cx: &Context<CQRS<Self::Cmd>>, _args: Self::Args) -> Self { Entity1 } async fn handle_command( &mut self, _cmd: Self::Cmd, ) -> Result<crate::Commit<Self::Model>, Self::Error> { Ok(Event::Create(Model1).into()) } } #[test] fn register_entities() { let sys = ActorSystem::new().unwrap(); let mgr = Manager::new(sys).register::<Entity1, _>(MemStore::new(), ()); let id = block_on(mgr.command(())); assert_eq!(id, "dummy".into()); } }