ng_net/actors/client/
commit_get.rs1use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18use ng_repo::types::{Block, OverlayId};
19
20use crate::broker::BROKER;
21use crate::connection::NoiseFSM;
22use crate::types::*;
23use crate::{actor::*, types::ProtocolMessage};
24
25impl CommitGet {
26 pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
27 Actor::<CommitGet, Block>::new_responder(id)
28 }
29
30 pub fn overlay(&self) -> &OverlayId {
31 match self {
32 Self::V0(v0) => v0.overlay.as_ref().unwrap(),
33 }
34 }
35 pub fn set_overlay(&mut self, overlay: OverlayId) {
36 match self {
37 Self::V0(v0) => v0.overlay = Some(overlay),
38 }
39 }
40}
41
42impl TryFrom<ProtocolMessage> for CommitGet {
43 type Error = ProtocolError;
44 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
45 let req: ClientRequestContentV0 = msg.try_into()?;
46 if let ClientRequestContentV0::CommitGet(a) = req {
47 Ok(a)
48 } else {
49 log_debug!("INVALID {:?}", req);
50 Err(ProtocolError::InvalidValue)
51 }
52 }
53}
54
55impl From<CommitGet> for ProtocolMessage {
56 fn from(msg: CommitGet) -> ProtocolMessage {
57 let overlay = *msg.overlay();
58 ProtocolMessage::from_client_request_v0(ClientRequestContentV0::CommitGet(msg), overlay)
59 }
60}
61
62impl TryFrom<ProtocolMessage> for Block {
63 type Error = ProtocolError;
64 fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
65 let res: ClientResponseContentV0 = msg.try_into()?;
66 if let ClientResponseContentV0::Block(a) = res {
67 Ok(a)
68 } else {
69 log_debug!("INVALID {:?}", res);
70 Err(ProtocolError::InvalidValue)
71 }
72 }
73}
74
75impl From<Block> for ProtocolMessage {
76 fn from(b: Block) -> ProtocolMessage {
77 let mut cr: ClientResponse = ClientResponseContentV0::Block(b).into();
78 cr.set_result(ServerError::PartialContent.into());
79 cr.into()
80 }
81}
82
83impl Actor<'_, CommitGet, Block> {}
84
85#[async_trait::async_trait]
86impl EActor for Actor<'_, CommitGet, Block> {
87 async fn respond(
88 &mut self,
89 msg: ProtocolMessage,
90 fsm: Arc<Mutex<NoiseFSM>>,
91 ) -> Result<(), ProtocolError> {
92 let req = CommitGet::try_from(msg)?;
93 let broker = { BROKER.read().await.get_server_broker()? };
94 let blocks_res = { broker.read().await.get_commit(req.overlay(), req.id()) };
95 match blocks_res {
97 Ok(blocks) => {
98 if blocks.is_empty() {
99 let re: Result<(), ServerError> = Err(ServerError::EmptyStream);
100 fsm.lock()
101 .await
102 .send_in_reply_to(re.into(), self.id())
103 .await?;
104 return Ok(());
105 }
106 let mut lock = fsm.lock().await;
107
108 for block in blocks {
109 lock.send_in_reply_to(block.into(), self.id()).await?;
110 }
111 let re: Result<(), ServerError> = Err(ServerError::EndOfStream);
112 lock.send_in_reply_to(re.into(), self.id()).await?;
113 }
114 Err(e) => {
115 let re: Result<(), ServerError> = Err(e);
116 fsm.lock()
117 .await
118 .send_in_reply_to(re.into(), self.id())
119 .await?;
120 }
121 }
122
123 Ok(())
124 }
125}