ng_net/actors/client/
pin_repo.rs1use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18use ng_repo::repo::Repo;
19use ng_repo::types::*;
20
21use crate::broker::BROKER;
22use crate::connection::NoiseFSM;
23use crate::types::*;
24use crate::{actor::*, types::ProtocolMessage};
25
26impl PinRepo {
27 pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
28 Actor::<PinRepo, RepoOpened>::new_responder(id)
29 }
30 pub fn for_branch(repo: &Repo, branch: &BranchId, broker_id: &DirectPeerId) -> PinRepo {
31 let overlay = OverlayAccess::new_write_access_from_store(&repo.store);
32 let mut rw_topics = Vec::with_capacity(1);
33 let mut ro_topics = vec![];
34 let branch = repo.branches.get(branch).unwrap();
35
36 if let Some(privkey) = &branch.topic_priv_key {
37 rw_topics.push(PublisherAdvert::new(
38 branch.topic.unwrap(),
39 privkey.clone(),
40 *broker_id,
41 ));
42 } else {
43 ro_topics.push(branch.topic.unwrap());
44 }
45
46 PinRepo::V0(PinRepoV0 {
47 hash: repo.id.into(),
48 overlay,
49 overlay_root_topic: None,
51 expose_outer: false,
52 peers: vec![],
53 max_peer_count: 0,
54 ro_topics,
56 rw_topics,
57 })
58 }
59 pub fn from_repo(repo: &Repo, broker_id: &DirectPeerId) -> PinRepo {
60 let overlay = OverlayAccess::new_write_access_from_store(&repo.store);
61 let mut rw_topics = Vec::with_capacity(repo.branches.len());
62 let mut ro_topics = vec![];
63 for (_, branch) in repo.branches.iter() {
64 if let Some(privkey) = &branch.topic_priv_key {
65 rw_topics.push(PublisherAdvert::new(
66 branch.topic.unwrap(),
67 privkey.clone(),
68 *broker_id,
69 ));
70 } else {
71 ro_topics.push(branch.topic.unwrap());
72 }
73 }
74 PinRepo::V0(PinRepoV0 {
75 hash: repo.id.into(),
76 overlay,
77 overlay_root_topic: None,
79 expose_outer: false,
80 peers: vec![],
81 max_peer_count: 0,
82 ro_topics,
84 rw_topics,
85 })
86 }
87}
88
89impl TryFrom<ProtocolMessage> for PinRepo {
90 type Error = ProtocolError;
91 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
92 let req: ClientRequestContentV0 = msg.try_into()?;
93 if let ClientRequestContentV0::PinRepo(a) = req {
94 Ok(a)
95 } else {
96 log_debug!("INVALID {:?}", req);
97 Err(ProtocolError::InvalidValue)
98 }
99 }
100}
101
102impl From<PinRepo> for ProtocolMessage {
103 fn from(msg: PinRepo) -> ProtocolMessage {
104 let overlay = match msg {
105 PinRepo::V0(ref v0) => v0.overlay.overlay_id_for_client_protocol_purpose().clone(),
106 };
107 ProtocolMessage::from_client_request_v0(ClientRequestContentV0::PinRepo(msg), overlay)
108 }
109}
110
111impl TryFrom<ProtocolMessage> for RepoOpened {
112 type Error = ProtocolError;
113 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
114 let res: ClientResponseContentV0 = msg.try_into()?;
115 if let ClientResponseContentV0::RepoOpened(a) = res {
116 Ok(a)
117 } else {
118 log_debug!("INVALID {:?}", res);
119 Err(ProtocolError::InvalidValue)
120 }
121 }
122}
123
124impl From<RepoOpened> for ProtocolMessage {
125 fn from(res: RepoOpened) -> ProtocolMessage {
126 ClientResponseContentV0::RepoOpened(res).into()
127 }
128}
129
130impl Actor<'_, RepoPinStatusReq, RepoPinStatus> {}
131
132#[async_trait::async_trait]
133impl EActor for Actor<'_, PinRepo, RepoOpened> {
134 async fn respond(
135 &mut self,
136 msg: ProtocolMessage,
137 fsm: Arc<Mutex<NoiseFSM>>,
138 ) -> Result<(), ProtocolError> {
139 let req = PinRepo::try_from(msg)?;
140
141 let (sb, server_peer_id) = {
142 let b = BROKER.read().await;
143 (b.get_server_broker()?, b.get_server_peer_id())
144 };
145
146 for pub_ad in req.rw_topics() {
148 pub_ad.verify_for_broker(&server_peer_id)?;
149 }
150
151 let (user_id, remote_peer) = {
152 let fsm = fsm.lock().await;
153 (fsm.user_id()?, fsm.get_client_peer_id()?)
154 };
155
156 let result = {
157 match req.overlay_access() {
158 OverlayAccess::ReadOnly(r) => {
159 if r.is_inner()
160 || req.overlay() != r
161 || req.rw_topics().len() > 0
162 || req.overlay_root_topic().is_some()
163 {
164 Err(ServerError::InvalidRequest)
165 } else {
166 sb.read()
167 .await
168 .pin_repo_read(
169 req.overlay(),
170 req.hash(),
171 &user_id,
172 req.ro_topics(),
173 &remote_peer,
174 )
175 .await
176 }
177 }
178 OverlayAccess::ReadWrite((w, r)) => {
179 if req.overlay() != w
180 || !w.is_inner()
181 || r.is_inner()
182 || req.expose_outer() && req.rw_topics().is_empty()
183 {
184 Err(ServerError::InvalidRequest)
187 } else {
188 sb.read()
189 .await
190 .pin_repo_write(
191 req.overlay_access(),
192 req.hash(),
193 &user_id,
194 req.ro_topics(),
195 req.rw_topics(),
196 req.overlay_root_topic(),
197 req.expose_outer(),
198 &remote_peer,
199 )
200 .await
201 }
202 }
203 OverlayAccess::WriteOnly(w) => {
204 if !w.is_inner() || req.overlay() != w || req.expose_outer() {
205 Err(ServerError::InvalidRequest)
206 } else {
207 sb.read()
208 .await
209 .pin_repo_write(
210 req.overlay_access(),
211 req.hash(),
212 &user_id,
213 req.ro_topics(),
214 req.rw_topics(),
215 req.overlay_root_topic(),
216 false,
217 &remote_peer,
218 )
219 .await
220 }
221 }
222 }
223 };
224 fsm.lock()
225 .await
226 .send_in_reply_to(result.into(), self.id())
227 .await?;
228 Ok(())
229 }
230}