ng_net/actors/client/
topic_sub.rs1use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18use ng_repo::repo::{BranchInfo, Repo};
19use ng_repo::types::*;
20
21use crate::broker::BROKER;
22use crate::connection::NoiseFSM;
23use crate::types::*;
24use crate::{actor::*, types::ProtocolMessage};
25
26impl TopicSub {
27 pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
28 Actor::<TopicSub, TopicSubRes>::new_responder(id)
29 }
30 pub fn new(repo: &Repo, branch: &BranchInfo, broker_id: Option<&DirectPeerId>) -> TopicSub {
32 let (overlay, publisher) = if broker_id.is_some() && branch.topic_priv_key.is_some() {
33 (
34 repo.store.inner_overlay(),
35 Some(PublisherAdvert::new(
36 branch.topic.unwrap(),
37 branch.topic_priv_key.to_owned().unwrap(),
38 *broker_id.unwrap(),
39 )),
40 )
41 } else {
42 (repo.store.inner_overlay(), None)
43 };
44
45 TopicSub::V0(TopicSubV0 {
46 repo_hash: repo.id.into(),
47 overlay: Some(overlay),
48 topic: branch.topic.unwrap(),
49 publisher,
50 })
51 }
52}
53
54impl TryFrom<ProtocolMessage> for TopicSub {
55 type Error = ProtocolError;
56 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
57 let req: ClientRequestContentV0 = msg.try_into()?;
58 if let ClientRequestContentV0::TopicSub(a) = req {
59 Ok(a)
60 } else {
61 log_debug!("INVALID {:?}", req);
62 Err(ProtocolError::InvalidValue)
63 }
64 }
65}
66
67impl From<TopicSub> for ProtocolMessage {
68 fn from(msg: TopicSub) -> ProtocolMessage {
69 let overlay = *msg.overlay();
70 ProtocolMessage::from_client_request_v0(ClientRequestContentV0::TopicSub(msg), overlay)
71 }
72}
73
74impl TryFrom<ProtocolMessage> for TopicSubRes {
75 type Error = ProtocolError;
76 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
77 let res: ClientResponseContentV0 = msg.try_into()?;
78 if let ClientResponseContentV0::TopicSubRes(a) = res {
79 Ok(a)
80 } else {
81 log_debug!("INVALID {:?}", res);
82 Err(ProtocolError::InvalidValue)
83 }
84 }
85}
86
87impl From<TopicSubRes> for ProtocolMessage {
88 fn from(res: TopicSubRes) -> ProtocolMessage {
89 ClientResponseContentV0::TopicSubRes(res).into()
90 }
91}
92
93impl Actor<'_, TopicSub, TopicSubRes> {}
94
95#[async_trait::async_trait]
96impl EActor for Actor<'_, TopicSub, TopicSubRes> {
97 async fn respond(
98 &mut self,
99 msg: ProtocolMessage,
100 fsm: Arc<Mutex<NoiseFSM>>,
101 ) -> Result<(), ProtocolError> {
102 let req = TopicSub::try_from(msg)?;
103
104 let (sb, server_peer_id) = {
105 let b = BROKER.read().await;
106 (b.get_server_broker()?, b.get_server_peer_id())
107 };
108
109 if let Some(advert) = req.publisher() {
111 advert.verify_for_broker(&server_peer_id)?;
112 }
113
114 let (user_id, remote_peer) = {
115 let fsm = fsm.lock().await;
116 (fsm.user_id()?, fsm.get_client_peer_id()?)
117 };
118
119 let res = {
120 sb.read()
121 .await
122 .topic_sub(
123 req.overlay(),
124 req.hash(),
125 req.topic(),
126 &user_id,
127 req.publisher(),
128 &remote_peer,
129 )
130 .await
131 };
132
133 fsm.lock()
134 .await
135 .send_in_reply_to(res.into(), self.id())
136 .await?;
137 Ok(())
138 }
139}