ng_net/actors/client/
blocks_get.rs

1/*
2 * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
3 * All rights reserved.
4 * Licensed under the Apache License, Version 2.0
5 * <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
6 * or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
7 * at your option. All files in the project carrying such
8 * notice may not be copied, modified, or distributed except
9 * according to those terms.
10*/
11
12use std::sync::Arc;
13
14use async_recursion::async_recursion;
15use async_std::sync::RwLock;
16use async_std::sync::{Mutex, MutexGuard};
17
18use ng_repo::errors::*;
19use ng_repo::log::*;
20use ng_repo::types::{Block, BlockId, OverlayId};
21
22use crate::broker::BROKER;
23use crate::connection::NoiseFSM;
24use crate::server_broker::IServerBroker;
25use crate::types::*;
26use crate::{actor::*, types::ProtocolMessage};
27
28impl BlocksGet {
29    pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
30        Actor::<BlocksGet, Block>::new_responder(id)
31    }
32
33    pub fn overlay(&self) -> &OverlayId {
34        match self {
35            Self::V0(v0) => v0.overlay.as_ref().unwrap(),
36        }
37    }
38    pub fn set_overlay(&mut self, overlay: OverlayId) {
39        match self {
40            Self::V0(v0) => v0.overlay = Some(overlay),
41        }
42    }
43}
44
45impl TryFrom<ProtocolMessage> for BlocksGet {
46    type Error = ProtocolError;
47    fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
48        let req: ClientRequestContentV0 = msg.try_into()?;
49        if let ClientRequestContentV0::BlocksGet(a) = req {
50            Ok(a)
51        } else {
52            log_debug!("INVALID {:?}", req);
53            Err(ProtocolError::InvalidValue)
54        }
55    }
56}
57
58impl From<BlocksGet> for ProtocolMessage {
59    fn from(msg: BlocksGet) -> ProtocolMessage {
60        let overlay = *msg.overlay();
61        ProtocolMessage::from_client_request_v0(ClientRequestContentV0::BlocksGet(msg), overlay)
62    }
63}
64
65impl Actor<'_, BlocksGet, Block> {}
66
67#[async_trait::async_trait]
68impl EActor for Actor<'_, BlocksGet, Block> {
69    async fn respond(
70        &mut self,
71        msg: ProtocolMessage,
72        fsm: Arc<Mutex<NoiseFSM>>,
73    ) -> Result<(), ProtocolError> {
74        let req = BlocksGet::try_from(msg)?;
75        let server = { BROKER.read().await.get_server_broker()? };
76        let mut lock = fsm.lock().await;
77        let mut something_was_sent = false;
78
79        #[async_recursion]
80        async fn process_children(
81            children: &Vec<BlockId>,
82            server: &RwLock<dyn IServerBroker + Send + Sync>,
83            overlay: &OverlayId,
84            lock: &mut MutexGuard<'_, NoiseFSM>,
85            req_id: i64,
86            include_children: bool,
87            something_was_sent: &mut bool,
88        ) {
89            for block_id in children {
90                if let Ok(block) = { server.read().await.get_block(overlay, block_id) } {
91                    let grand_children = block.children().to_vec();
92                    if let Err(_) = lock.send_in_reply_to(block.into(), req_id).await {
93                        break;
94                    }
95                    *something_was_sent = true;
96                    if include_children {
97                        process_children(
98                            &grand_children,
99                            server,
100                            overlay,
101                            lock,
102                            req_id,
103                            include_children,
104                            something_was_sent,
105                        )
106                        .await;
107                    }
108                }
109            }
110        }
111        process_children(
112            req.ids(),
113            &server,
114            req.overlay(),
115            &mut lock,
116            self.id(),
117            req.include_children(),
118            &mut something_was_sent,
119        )
120        .await;
121
122        if !something_was_sent {
123            let re: Result<(), ServerError> = Err(ServerError::NotFound);
124            lock.send_in_reply_to(re.into(), self.id()).await?;
125        } else {
126            let re: Result<(), ServerError> = Err(ServerError::EndOfStream);
127            lock.send_in_reply_to(re.into(), self.id()).await?;
128        }
129
130        Ok(())
131    }
132}