ng_net/actors/client/
commit_get.rs

1/*
2 * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
3 * All rights reserved.
4 * Licensed under the Apache License, Version 2.0
5 * <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
6 * or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
7 * at your option. All files in the project carrying such
8 * notice may not be copied, modified, or distributed except
9 * according to those terms.
10*/
11
12use std::sync::Arc;
13
14use async_std::sync::Mutex;
15
16use ng_repo::errors::*;
17use ng_repo::log::*;
18use ng_repo::types::{Block, OverlayId};
19
20use crate::broker::BROKER;
21use crate::connection::NoiseFSM;
22use crate::types::*;
23use crate::{actor::*, types::ProtocolMessage};
24
25impl CommitGet {
26    pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
27        Actor::<CommitGet, Block>::new_responder(id)
28    }
29
30    pub fn overlay(&self) -> &OverlayId {
31        match self {
32            Self::V0(v0) => v0.overlay.as_ref().unwrap(),
33        }
34    }
35    pub fn set_overlay(&mut self, overlay: OverlayId) {
36        match self {
37            Self::V0(v0) => v0.overlay = Some(overlay),
38        }
39    }
40}
41
42impl TryFrom<ProtocolMessage> for CommitGet {
43    type Error = ProtocolError;
44    fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
45        let req: ClientRequestContentV0 = msg.try_into()?;
46        if let ClientRequestContentV0::CommitGet(a) = req {
47            Ok(a)
48        } else {
49            log_debug!("INVALID {:?}", req);
50            Err(ProtocolError::InvalidValue)
51        }
52    }
53}
54
55impl From<CommitGet> for ProtocolMessage {
56    fn from(msg: CommitGet) -> ProtocolMessage {
57        let overlay = *msg.overlay();
58        ProtocolMessage::from_client_request_v0(ClientRequestContentV0::CommitGet(msg), overlay)
59    }
60}
61
62impl TryFrom<ProtocolMessage> for Block {
63    type Error = ProtocolError;
64    fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
65        let res: ClientResponseContentV0 = msg.try_into()?;
66        if let ClientResponseContentV0::Block(a) = res {
67            Ok(a)
68        } else {
69            log_debug!("INVALID {:?}", res);
70            Err(ProtocolError::InvalidValue)
71        }
72    }
73}
74
75impl From<Block> for ProtocolMessage {
76    fn from(b: Block) -> ProtocolMessage {
77        let mut cr: ClientResponse = ClientResponseContentV0::Block(b).into();
78        cr.set_result(ServerError::PartialContent.into());
79        cr.into()
80    }
81}
82
83impl Actor<'_, CommitGet, Block> {}
84
85#[async_trait::async_trait]
86impl EActor for Actor<'_, CommitGet, Block> {
87    async fn respond(
88        &mut self,
89        msg: ProtocolMessage,
90        fsm: Arc<Mutex<NoiseFSM>>,
91    ) -> Result<(), ProtocolError> {
92        let req = CommitGet::try_from(msg)?;
93        let broker = { BROKER.read().await.get_server_broker()? };
94        let blocks_res = { broker.read().await.get_commit(req.overlay(), req.id()) };
95        // IF NEEDED, the get_commit could be changed to return a stream, and then the send_in_reply_to would be also totally async
96        match blocks_res {
97            Ok(blocks) => {
98                if blocks.is_empty() {
99                    let re: Result<(), ServerError> = Err(ServerError::EmptyStream);
100                    fsm.lock()
101                        .await
102                        .send_in_reply_to(re.into(), self.id())
103                        .await?;
104                    return Ok(());
105                }
106                let mut lock = fsm.lock().await;
107
108                for block in blocks {
109                    lock.send_in_reply_to(block.into(), self.id()).await?;
110                }
111                let re: Result<(), ServerError> = Err(ServerError::EndOfStream);
112                lock.send_in_reply_to(re.into(), self.id()).await?;
113            }
114            Err(e) => {
115                let re: Result<(), ServerError> = Err(e);
116                fsm.lock()
117                    .await
118                    .send_in_reply_to(re.into(), self.id())
119                    .await?;
120            }
121        }
122
123        Ok(())
124    }
125}