Skip to main content

ave_core/distribution/
mod.rs

1use std::{collections::HashSet, sync::Arc};
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use tracing::{Span, debug, error, info_span};
10
11use crate::{
12    distribution::coordinator::{DistriCoordinator, DistriCoordinatorMessage},
13    helpers::network::service::NetworkSender,
14    metrics::try_core_metrics,
15    model::common::emit_fail,
16    request::manager::{RequestManager, RequestManagerMessage},
17    subject::SignedLedger,
18};
19
20pub mod coordinator;
21pub mod error;
22pub mod worker;
23
24#[derive(Debug, Clone)]
25pub enum DistributionType {
26    Manual,
27    Request,
28}
29
30pub struct Distribution {
31    network: Arc<NetworkSender>,
32    witnesses: HashSet<PublicKey>,
33    distribution_type: DistributionType,
34    subject_id: DigestIdentifier,
35    request_id: DigestIdentifier,
36}
37
38impl Distribution {
39    fn observe_event(result: &'static str) {
40        if let Some(metrics) = try_core_metrics() {
41            metrics.observe_protocol_event("distribution", result);
42        }
43    }
44
45    pub fn new(
46        network: Arc<NetworkSender>,
47        distribution_type: DistributionType,
48        request_id: DigestIdentifier,
49    ) -> Self {
50        Self {
51            request_id,
52            network,
53            distribution_type,
54            witnesses: HashSet::new(),
55            subject_id: DigestIdentifier::default(),
56        }
57    }
58
59    fn check_witness(&mut self, witness: PublicKey) -> bool {
60        self.witnesses.remove(&witness)
61    }
62
63    async fn create_distributors(
64        &self,
65        ctx: &mut ActorContext<Self>,
66        ledger: SignedLedger,
67        signer: PublicKey,
68    ) -> Result<(), ActorError> {
69        let child = ctx
70            .create_child(
71                &format!("{}", signer),
72                DistriCoordinator {
73                    node_key: signer.clone(),
74                    network: self.network.clone(),
75                },
76            )
77            .await;
78        let distributor_actor = match child {
79            Ok(child) => child,
80            Err(e) => {
81                error!(
82                    subject_id = %self.subject_id,
83                    witness = %signer,
84                    error = %e,
85                    "Failed to create distributor coordinator"
86                );
87                return Err(e);
88            }
89        };
90
91        let request_id = match self.distribution_type {
92            DistributionType::Manual => {
93                format!("node/manual_distribution/{}", self.subject_id)
94            }
95            DistributionType::Request => {
96                format!("request/{}/distribution", self.subject_id)
97            }
98        };
99
100        distributor_actor
101            .tell(DistriCoordinatorMessage::NetworkDistribution {
102                request_id,
103                ledger: Box::new(ledger),
104            })
105            .await
106    }
107
108    async fn end_request(
109        &self,
110        ctx: &ActorContext<Self>,
111    ) -> Result<(), ActorError> {
112        if matches!(self.distribution_type, DistributionType::Request) {
113            let req_actor = ctx.get_parent::<RequestManager>().await?;
114            req_actor
115                .tell(RequestManagerMessage::FinishRequest {
116                    request_id: self.request_id.clone(),
117                })
118                .await?;
119        } else {
120            ctx.stop(None).await;
121        }
122
123        Ok(())
124    }
125}
126
127#[async_trait]
128impl Actor for Distribution {
129    type Event = ();
130    type Message = DistributionMessage;
131    type Response = ();
132
133    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
134        parent_span.map_or_else(
135            || info_span!("Distribution", id),
136            |parent_span| info_span!(parent: parent_span, "Distribution", id),
137        )
138    }
139}
140
141#[derive(Debug, Clone)]
142pub enum DistributionMessage {
143    Create {
144        ledger: Box<SignedLedger>,
145        witnesses: HashSet<PublicKey>,
146    },
147    Response {
148        sender: PublicKey,
149    },
150}
151
152impl Message for DistributionMessage {}
153
154impl NotPersistentActor for Distribution {}
155
156#[async_trait]
157impl Handler<Self> for Distribution {
158    async fn handle_message(
159        &mut self,
160        _sender: ActorPath,
161        msg: DistributionMessage,
162        ctx: &mut ActorContext<Self>,
163    ) -> Result<(), ActorError> {
164        match msg {
165            DistributionMessage::Create { ledger, witnesses } => {
166                self.witnesses.clone_from(&witnesses);
167                self.subject_id = ledger.content().get_subject_id();
168
169                debug!(
170                    msg_type = "Create",
171                    subject_id = %self.subject_id,
172                    witnesses_count = witnesses.len(),
173                    distribution_type = ?self.distribution_type,
174                    "Starting distribution to witnesses"
175                );
176
177                for witness in witnesses.iter() {
178                    self.create_distributors(
179                        ctx,
180                        *ledger.clone(),
181                        witness.clone(),
182                    )
183                    .await?
184                }
185
186                debug!(
187                    msg_type = "Create",
188                    subject_id = %self.subject_id,
189                    "All distributor coordinators created"
190                );
191            }
192            DistributionMessage::Response { sender } => {
193                debug!(
194                    msg_type = "Response",
195                    subject_id = %self.subject_id,
196                    sender = %sender,
197                    remaining_witnesses = self.witnesses.len(),
198                    "Distribution response received"
199                );
200
201                if self.check_witness(sender.clone())
202                    && self.witnesses.is_empty()
203                {
204                    Self::observe_event("success");
205                    debug!(
206                        msg_type = "Response",
207                        subject_id = %self.subject_id,
208                        "All witnesses responded, ending distribution"
209                    );
210
211                    if let Err(e) = self.end_request(ctx).await {
212                        error!(
213                            msg_type = "Response",
214                            subject_id = %self.subject_id,
215                            request_id = %self.request_id,
216                            error = %e,
217                            "Failed to end distribution request"
218                        );
219                        return Err(emit_fail(ctx, e).await);
220                    };
221                }
222            }
223        }
224
225        Ok(())
226    }
227
228    async fn on_child_fault(
229        &mut self,
230        error: ActorError,
231        ctx: &mut ActorContext<Self>,
232    ) -> ChildAction {
233        Self::observe_event("error");
234        error!(
235            subject_id = %self.subject_id,
236            request_id = %self.request_id,
237            distribution_type = ?self.distribution_type,
238            error = %error,
239            "Child fault in distribution actor"
240        );
241        emit_fail(ctx, error).await;
242        ChildAction::Stop
243    }
244}