ng_net/actors/client/
pin_repo.rs

1/*
2 * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
3 * All rights reserved.
4 * Licensed under the Apache License, Version 2.0
5 * <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
6 * or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
7 * at your option. All files in the project carrying such
8 * notice may not be copied, modified, or distributed except
9 * according to those terms.
10*/
11
12use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18use ng_repo::repo::Repo;
19use ng_repo::types::*;
20
21use crate::broker::BROKER;
22use crate::connection::NoiseFSM;
23use crate::types::*;
24use crate::{actor::*, types::ProtocolMessage};
25
26impl PinRepo {
27    pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
28        Actor::<PinRepo, RepoOpened>::new_responder(id)
29    }
30    pub fn for_branch(repo: &Repo, branch: &BranchId, broker_id: &DirectPeerId) -> PinRepo {
31        let overlay = OverlayAccess::new_write_access_from_store(&repo.store);
32        let mut rw_topics = Vec::with_capacity(1);
33        let mut ro_topics = vec![];
34        let branch = repo.branches.get(branch).unwrap();
35
36        if let Some(privkey) = &branch.topic_priv_key {
37            rw_topics.push(PublisherAdvert::new(
38                branch.topic.unwrap(),
39                privkey.clone(),
40                *broker_id,
41            ));
42        } else {
43            ro_topics.push(branch.topic.unwrap());
44        }
45
46        PinRepo::V0(PinRepoV0 {
47            hash: repo.id.into(),
48            overlay,
49            // TODO: overlay_root_topic
50            overlay_root_topic: None,
51            expose_outer: false,
52            peers: vec![],
53            max_peer_count: 0,
54            //allowed_peers: vec![],
55            ro_topics,
56            rw_topics,
57        })
58    }
59    pub fn from_repo(repo: &Repo, broker_id: &DirectPeerId) -> PinRepo {
60        let overlay = OverlayAccess::new_write_access_from_store(&repo.store);
61        let mut rw_topics = Vec::with_capacity(repo.branches.len());
62        let mut ro_topics = vec![];
63        for (_, branch) in repo.branches.iter() {
64            if let Some(privkey) = &branch.topic_priv_key {
65                rw_topics.push(PublisherAdvert::new(
66                    branch.topic.unwrap(),
67                    privkey.clone(),
68                    *broker_id,
69                ));
70            } else {
71                ro_topics.push(branch.topic.unwrap());
72            }
73        }
74        PinRepo::V0(PinRepoV0 {
75            hash: repo.id.into(),
76            overlay,
77            // TODO: overlay_root_topic
78            overlay_root_topic: None,
79            expose_outer: false,
80            peers: vec![],
81            max_peer_count: 0,
82            //allowed_peers: vec![],
83            ro_topics,
84            rw_topics,
85        })
86    }
87}
88
89impl TryFrom<ProtocolMessage> for PinRepo {
90    type Error = ProtocolError;
91    fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
92        let req: ClientRequestContentV0 = msg.try_into()?;
93        if let ClientRequestContentV0::PinRepo(a) = req {
94            Ok(a)
95        } else {
96            log_debug!("INVALID {:?}", req);
97            Err(ProtocolError::InvalidValue)
98        }
99    }
100}
101
102impl From<PinRepo> for ProtocolMessage {
103    fn from(msg: PinRepo) -> ProtocolMessage {
104        let overlay = match msg {
105            PinRepo::V0(ref v0) => v0.overlay.overlay_id_for_client_protocol_purpose().clone(),
106        };
107        ProtocolMessage::from_client_request_v0(ClientRequestContentV0::PinRepo(msg), overlay)
108    }
109}
110
111impl TryFrom<ProtocolMessage> for RepoOpened {
112    type Error = ProtocolError;
113    fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
114        let res: ClientResponseContentV0 = msg.try_into()?;
115        if let ClientResponseContentV0::RepoOpened(a) = res {
116            Ok(a)
117        } else {
118            log_debug!("INVALID {:?}", res);
119            Err(ProtocolError::InvalidValue)
120        }
121    }
122}
123
124impl From<RepoOpened> for ProtocolMessage {
125    fn from(res: RepoOpened) -> ProtocolMessage {
126        ClientResponseContentV0::RepoOpened(res).into()
127    }
128}
129
130impl Actor<'_, RepoPinStatusReq, RepoPinStatus> {}
131
132#[async_trait::async_trait]
133impl EActor for Actor<'_, PinRepo, RepoOpened> {
134    async fn respond(
135        &mut self,
136        msg: ProtocolMessage,
137        fsm: Arc<Mutex<NoiseFSM>>,
138    ) -> Result<(), ProtocolError> {
139        let req = PinRepo::try_from(msg)?;
140
141        let (sb, server_peer_id) = {
142            let b = BROKER.read().await;
143            (b.get_server_broker()?, b.get_server_peer_id())
144        };
145
146        // check the validity of the PublisherAdvert(s). this will return a ProtocolError (will close the connection)
147        for pub_ad in req.rw_topics() {
148            pub_ad.verify_for_broker(&server_peer_id)?;
149        }
150
151        let (user_id, remote_peer) = {
152            let fsm = fsm.lock().await;
153            (fsm.user_id()?, fsm.get_client_peer_id()?)
154        };
155
156        let result = {
157            match req.overlay_access() {
158                OverlayAccess::ReadOnly(r) => {
159                    if r.is_inner()
160                        || req.overlay() != r
161                        || req.rw_topics().len() > 0
162                        || req.overlay_root_topic().is_some()
163                    {
164                        Err(ServerError::InvalidRequest)
165                    } else {
166                        sb.read()
167                            .await
168                            .pin_repo_read(
169                                req.overlay(),
170                                req.hash(),
171                                &user_id,
172                                req.ro_topics(),
173                                &remote_peer,
174                            )
175                            .await
176                    }
177                }
178                OverlayAccess::ReadWrite((w, r)) => {
179                    if req.overlay() != w
180                        || !w.is_inner()
181                        || r.is_inner()
182                        || req.expose_outer() && req.rw_topics().is_empty()
183                    {
184                        // we do not allow to expose_outer if not a publisher for at least one topic
185                        // TODO add a check on "|| overlay_root_topic.is_none()"  because it should be mandatory to have one (not sent by client at the moment)
186                        Err(ServerError::InvalidRequest)
187                    } else {
188                        sb.read()
189                            .await
190                            .pin_repo_write(
191                                req.overlay_access(),
192                                req.hash(),
193                                &user_id,
194                                req.ro_topics(),
195                                req.rw_topics(),
196                                req.overlay_root_topic(),
197                                req.expose_outer(),
198                                &remote_peer,
199                            )
200                            .await
201                    }
202                }
203                OverlayAccess::WriteOnly(w) => {
204                    if !w.is_inner() || req.overlay() != w || req.expose_outer() {
205                        Err(ServerError::InvalidRequest)
206                    } else {
207                        sb.read()
208                            .await
209                            .pin_repo_write(
210                                req.overlay_access(),
211                                req.hash(),
212                                &user_id,
213                                req.ro_topics(),
214                                req.rw_topics(),
215                                req.overlay_root_topic(),
216                                false,
217                                &remote_peer,
218                            )
219                            .await
220                    }
221                }
222            }
223        };
224        fsm.lock()
225            .await
226            .send_in_reply_to(result.into(), self.id())
227            .await?;
228        Ok(())
229    }
230}