ng_net/actors/client/
blocks_get.rs1use std::sync::Arc;
13
14use async_recursion::async_recursion;
15use async_std::sync::RwLock;
16use async_std::sync::{Mutex, MutexGuard};
17
18use ng_repo::errors::*;
19use ng_repo::log::*;
20use ng_repo::types::{Block, BlockId, OverlayId};
21
22use crate::broker::BROKER;
23use crate::connection::NoiseFSM;
24use crate::server_broker::IServerBroker;
25use crate::types::*;
26use crate::{actor::*, types::ProtocolMessage};
27
28impl BlocksGet {
29 pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
30 Actor::<BlocksGet, Block>::new_responder(id)
31 }
32
33 pub fn overlay(&self) -> &OverlayId {
34 match self {
35 Self::V0(v0) => v0.overlay.as_ref().unwrap(),
36 }
37 }
38 pub fn set_overlay(&mut self, overlay: OverlayId) {
39 match self {
40 Self::V0(v0) => v0.overlay = Some(overlay),
41 }
42 }
43}
44
45impl TryFrom<ProtocolMessage> for BlocksGet {
46 type Error = ProtocolError;
47 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
48 let req: ClientRequestContentV0 = msg.try_into()?;
49 if let ClientRequestContentV0::BlocksGet(a) = req {
50 Ok(a)
51 } else {
52 log_debug!("INVALID {:?}", req);
53 Err(ProtocolError::InvalidValue)
54 }
55 }
56}
57
58impl From<BlocksGet> for ProtocolMessage {
59 fn from(msg: BlocksGet) -> ProtocolMessage {
60 let overlay = *msg.overlay();
61 ProtocolMessage::from_client_request_v0(ClientRequestContentV0::BlocksGet(msg), overlay)
62 }
63}
64
65impl Actor<'_, BlocksGet, Block> {}
66
67#[async_trait::async_trait]
68impl EActor for Actor<'_, BlocksGet, Block> {
69 async fn respond(
70 &mut self,
71 msg: ProtocolMessage,
72 fsm: Arc<Mutex<NoiseFSM>>,
73 ) -> Result<(), ProtocolError> {
74 let req = BlocksGet::try_from(msg)?;
75 let server = { BROKER.read().await.get_server_broker()? };
76 let mut lock = fsm.lock().await;
77 let mut something_was_sent = false;
78
79 #[async_recursion]
80 async fn process_children(
81 children: &Vec<BlockId>,
82 server: &RwLock<dyn IServerBroker + Send + Sync>,
83 overlay: &OverlayId,
84 lock: &mut MutexGuard<'_, NoiseFSM>,
85 req_id: i64,
86 include_children: bool,
87 something_was_sent: &mut bool,
88 ) {
89 for block_id in children {
90 if let Ok(block) = { server.read().await.get_block(overlay, block_id) } {
91 let grand_children = block.children().to_vec();
92 if let Err(_) = lock.send_in_reply_to(block.into(), req_id).await {
93 break;
94 }
95 *something_was_sent = true;
96 if include_children {
97 process_children(
98 &grand_children,
99 server,
100 overlay,
101 lock,
102 req_id,
103 include_children,
104 something_was_sent,
105 )
106 .await;
107 }
108 }
109 }
110 }
111 process_children(
112 req.ids(),
113 &server,
114 req.overlay(),
115 &mut lock,
116 self.id(),
117 req.include_children(),
118 &mut something_was_sent,
119 )
120 .await;
121
122 if !something_was_sent {
123 let re: Result<(), ServerError> = Err(ServerError::NotFound);
124 lock.send_in_reply_to(re.into(), self.id()).await?;
125 } else {
126 let re: Result<(), ServerError> = Err(ServerError::EndOfStream);
127 lock.send_in_reply_to(re.into(), self.id()).await?;
128 }
129
130 Ok(())
131 }
132}