ng_net/actors/client/
topic_sub.rs

1/*
2 * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
3 * All rights reserved.
4 * Licensed under the Apache License, Version 2.0
5 * <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
6 * or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
7 * at your option. All files in the project carrying such
8 * notice may not be copied, modified, or distributed except
9 * according to those terms.
10*/
11
12use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18use ng_repo::repo::{BranchInfo, Repo};
19use ng_repo::types::*;
20
21use crate::broker::BROKER;
22use crate::connection::NoiseFSM;
23use crate::types::*;
24use crate::{actor::*, types::ProtocolMessage};
25
26impl TopicSub {
27    pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
28        Actor::<TopicSub, TopicSubRes>::new_responder(id)
29    }
30    /// only set broker_id if you want to be a publisher
31    pub fn new(repo: &Repo, branch: &BranchInfo, broker_id: Option<&DirectPeerId>) -> TopicSub {
32        let (overlay, publisher) = if broker_id.is_some() && branch.topic_priv_key.is_some() {
33            (
34                repo.store.inner_overlay(),
35                Some(PublisherAdvert::new(
36                    branch.topic.unwrap(),
37                    branch.topic_priv_key.to_owned().unwrap(),
38                    *broker_id.unwrap(),
39                )),
40            )
41        } else {
42            (repo.store.inner_overlay(), None)
43        };
44
45        TopicSub::V0(TopicSubV0 {
46            repo_hash: repo.id.into(),
47            overlay: Some(overlay),
48            topic: branch.topic.unwrap(),
49            publisher,
50        })
51    }
52}
53
54impl TryFrom<ProtocolMessage> for TopicSub {
55    type Error = ProtocolError;
56    fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
57        let req: ClientRequestContentV0 = msg.try_into()?;
58        if let ClientRequestContentV0::TopicSub(a) = req {
59            Ok(a)
60        } else {
61            log_debug!("INVALID {:?}", req);
62            Err(ProtocolError::InvalidValue)
63        }
64    }
65}
66
67impl From<TopicSub> for ProtocolMessage {
68    fn from(msg: TopicSub) -> ProtocolMessage {
69        let overlay = *msg.overlay();
70        ProtocolMessage::from_client_request_v0(ClientRequestContentV0::TopicSub(msg), overlay)
71    }
72}
73
74impl TryFrom<ProtocolMessage> for TopicSubRes {
75    type Error = ProtocolError;
76    fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
77        let res: ClientResponseContentV0 = msg.try_into()?;
78        if let ClientResponseContentV0::TopicSubRes(a) = res {
79            Ok(a)
80        } else {
81            log_debug!("INVALID {:?}", res);
82            Err(ProtocolError::InvalidValue)
83        }
84    }
85}
86
87impl From<TopicSubRes> for ProtocolMessage {
88    fn from(res: TopicSubRes) -> ProtocolMessage {
89        ClientResponseContentV0::TopicSubRes(res).into()
90    }
91}
92
93impl Actor<'_, TopicSub, TopicSubRes> {}
94
95#[async_trait::async_trait]
96impl EActor for Actor<'_, TopicSub, TopicSubRes> {
97    async fn respond(
98        &mut self,
99        msg: ProtocolMessage,
100        fsm: Arc<Mutex<NoiseFSM>>,
101    ) -> Result<(), ProtocolError> {
102        let req = TopicSub::try_from(msg)?;
103
104        let (sb, server_peer_id) = {
105            let b = BROKER.read().await;
106            (b.get_server_broker()?, b.get_server_peer_id())
107        };
108
109        // check the validity of the PublisherAdvert. this will return a ProtocolError (will close the connection)
110        if let Some(advert) = req.publisher() {
111            advert.verify_for_broker(&server_peer_id)?;
112        }
113
114        let (user_id, remote_peer) = {
115            let fsm = fsm.lock().await;
116            (fsm.user_id()?, fsm.get_client_peer_id()?)
117        };
118
119        let res = {
120            sb.read()
121                .await
122                .topic_sub(
123                    req.overlay(),
124                    req.hash(),
125                    req.topic(),
126                    &user_id,
127                    req.publisher(),
128                    &remote_peer,
129                )
130                .await
131        };
132
133        fsm.lock()
134            .await
135            .send_in_reply_to(res.into(), self.id())
136            .await?;
137        Ok(())
138    }
139}