ng_net/actors/client/
event.rs1use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18use ng_repo::types::*;
19
20#[cfg(not(target_arch = "wasm32"))]
21use crate::broker::BROKER;
22use crate::connection::NoiseFSM;
23use crate::types::*;
24use crate::{actor::*, types::ProtocolMessage};
25
26impl PublishEvent {
27 pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
28 Actor::<PublishEvent, ()>::new_responder(id)
29 }
30
31 pub fn new(event: Event, overlay: OverlayId) -> PublishEvent {
32 PublishEvent(event, Some(overlay))
33 }
34 pub fn set_overlay(&mut self, overlay: OverlayId) {
35 self.1 = Some(overlay);
36 }
37
38 pub fn overlay(&self) -> &OverlayId {
39 self.1.as_ref().unwrap()
40 }
41 pub fn event(&self) -> &Event {
42 &self.0
43 }
44 pub fn take_event(self) -> Event {
45 self.0
46 }
47}
48
49impl TryFrom<ProtocolMessage> for PublishEvent {
50 type Error = ProtocolError;
51 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
52 let req: ClientRequestContentV0 = msg.try_into()?;
53 if let ClientRequestContentV0::PublishEvent(a) = req {
54 Ok(a)
55 } else {
56 log_debug!("INVALID {:?}", req);
57 Err(ProtocolError::InvalidValue)
58 }
59 }
60}
61
62impl From<PublishEvent> for ProtocolMessage {
63 fn from(msg: PublishEvent) -> ProtocolMessage {
64 let overlay = msg.1.unwrap();
65 ProtocolMessage::from_client_request_v0(ClientRequestContentV0::PublishEvent(msg), overlay)
66 }
67}
68
69impl Actor<'_, PublishEvent, ()> {}
70
71#[async_trait::async_trait]
72impl EActor for Actor<'_, PublishEvent, ()> {
73 async fn respond(
74 &mut self,
75 _msg: ProtocolMessage,
76 _fsm: Arc<Mutex<NoiseFSM>>,
77 ) -> Result<(), ProtocolError> {
78 #[cfg(not(target_arch = "wasm32"))]
79 {
80 let req = PublishEvent::try_from(_msg)?;
81 req.event().verify()?;
83 let overlay = req.overlay().clone();
84 let (user_id, remote_peer) = {
85 let fsm = _fsm.lock().await;
86 (
87 fsm.user_id()?,
88 fsm.remote_peer().ok_or(ProtocolError::ActorError)?,
89 )
90 };
91 let res = {
92 let broker = BROKER.read().await;
93 broker
94 .dispatch_event(&overlay, req.take_event(), &user_id, &remote_peer)
95 .await
96 };
97 if res.is_err() {
98 let res: Result<(), ServerError> = Err(res.unwrap_err());
99 _fsm.lock()
100 .await
101 .send_in_reply_to(res.into(), self.id())
102 .await?;
103 } else {
104 let broker = { BROKER.read().await.get_server_broker()? };
105 for client in res.unwrap() {
106 broker
107 .read()
108 .await
109 .remove_all_subscriptions_of_client(&client)
110 .await;
111 }
112 let finalres: Result<(), ServerError> = Ok(());
113 _fsm.lock()
114 .await
115 .send_in_reply_to(finalres.into(), self.id())
116 .await?;
117 }
118 }
119 Ok(())
120 }
121}