Skip to main content

ave_core/distribution/
mod.rs

1use std::{collections::HashSet, sync::Arc};
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use tracing::{Span, debug, error, info_span};
10
11use crate::{
12    distribution::coordinator::{DistriCoordinator, DistriCoordinatorMessage},
13    helpers::network::service::NetworkSender,
14    model::common::emit_fail,
15    request::manager::{RequestManager, RequestManagerMessage},
16    subject::SignedLedger,
17};
18
19pub mod coordinator;
20pub mod error;
21pub mod worker;
22
23#[derive(Debug, Clone)]
24pub enum DistributionType {
25    Manual,
26    Request,
27}
28
29pub struct Distribution {
30    network: Arc<NetworkSender>,
31    witnesses: HashSet<PublicKey>,
32    distribution_type: DistributionType,
33    subject_id: DigestIdentifier,
34    request_id: DigestIdentifier,
35}
36
37impl Distribution {
38    pub fn new(
39        network: Arc<NetworkSender>,
40        distribution_type: DistributionType,
41        request_id: DigestIdentifier,
42    ) -> Self {
43        Self {
44            request_id,
45            network,
46            distribution_type,
47            witnesses: HashSet::new(),
48            subject_id: DigestIdentifier::default(),
49        }
50    }
51
52    fn check_witness(&mut self, witness: PublicKey) -> bool {
53        self.witnesses.remove(&witness)
54    }
55
56    async fn create_distributors(
57        &self,
58        ctx: &mut ActorContext<Self>,
59        ledger: SignedLedger,
60        signer: PublicKey,
61    ) -> Result<(), ActorError> {
62        let child = ctx
63            .create_child(
64                &format!("{}", signer),
65                DistriCoordinator {
66                    node_key: signer.clone(),
67                    network: self.network.clone(),
68                },
69            )
70            .await;
71        let distributor_actor = match child {
72            Ok(child) => child,
73            Err(e) => {
74                error!(
75                    subject_id = %self.subject_id,
76                    witness = %signer,
77                    error = %e,
78                    "Failed to create distributor coordinator"
79                );
80                return Err(e);
81            }
82        };
83
84        let request_id = match self.distribution_type {
85            DistributionType::Manual => {
86                format!("node/manual_distribution/{}", self.subject_id)
87            }
88            DistributionType::Request => {
89                format!("request/{}/distribution", self.subject_id)
90            }
91        };
92
93        distributor_actor
94            .tell(DistriCoordinatorMessage::NetworkDistribution {
95                request_id,
96                ledger: Box::new(ledger),
97            })
98            .await
99    }
100
101    async fn end_request(
102        &self,
103        ctx: &ActorContext<Self>,
104    ) -> Result<(), ActorError> {
105        if matches!(self.distribution_type, DistributionType::Request) {
106            let req_actor = ctx.get_parent::<RequestManager>().await?;
107            req_actor
108                .tell(RequestManagerMessage::FinishRequest {
109                    request_id: self.request_id.clone(),
110                })
111                .await?;
112        } else {
113            ctx.stop(None).await;
114        }
115
116        Ok(())
117    }
118}
119
120#[async_trait]
121impl Actor for Distribution {
122    type Event = ();
123    type Message = DistributionMessage;
124    type Response = ();
125
126    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
127        parent_span.map_or_else(
128            || info_span!("Distribution", id),
129            |parent_span| info_span!(parent: parent_span, "Distribution", id),
130        )
131    }
132}
133
134#[derive(Debug, Clone)]
135pub enum DistributionMessage {
136    Create {
137        ledger: Box<SignedLedger>,
138        witnesses: HashSet<PublicKey>,
139    },
140    Response {
141        sender: PublicKey,
142    },
143}
144
145impl Message for DistributionMessage {}
146
147impl NotPersistentActor for Distribution {}
148
149#[async_trait]
150impl Handler<Self> for Distribution {
151    async fn handle_message(
152        &mut self,
153        _sender: ActorPath,
154        msg: DistributionMessage,
155        ctx: &mut ActorContext<Self>,
156    ) -> Result<(), ActorError> {
157        match msg {
158            DistributionMessage::Create { ledger, witnesses } => {
159                self.witnesses.clone_from(&witnesses);
160                self.subject_id = ledger.content().get_subject_id();
161
162                debug!(
163                    msg_type = "Create",
164                    subject_id = %self.subject_id,
165                    witnesses_count = witnesses.len(),
166                    distribution_type = ?self.distribution_type,
167                    "Starting distribution to witnesses"
168                );
169
170                for witness in witnesses.iter() {
171                    self.create_distributors(
172                        ctx,
173                        *ledger.clone(),
174                        witness.clone(),
175                    )
176                    .await?
177                }
178
179                debug!(
180                    msg_type = "Create",
181                    subject_id = %self.subject_id,
182                    "All distributor coordinators created"
183                );
184            }
185            DistributionMessage::Response { sender } => {
186                debug!(
187                    msg_type = "Response",
188                    subject_id = %self.subject_id,
189                    sender = %sender,
190                    remaining_witnesses = self.witnesses.len(),
191                    "Distribution response received"
192                );
193
194                if self.check_witness(sender.clone())
195                    && self.witnesses.is_empty()
196                {
197                    debug!(
198                        msg_type = "Response",
199                        subject_id = %self.subject_id,
200                        "All witnesses responded, ending distribution"
201                    );
202
203                    if let Err(e) = self.end_request(ctx).await {
204                        error!(
205                            msg_type = "Response",
206                            subject_id = %self.subject_id,
207                            request_id = %self.request_id,
208                            error = %e,
209                            "Failed to end distribution request"
210                        );
211                        return Err(emit_fail(ctx, e).await);
212                    };
213                }
214            }
215        }
216
217        Ok(())
218    }
219
220    async fn on_child_fault(
221        &mut self,
222        error: ActorError,
223        ctx: &mut ActorContext<Self>,
224    ) -> ChildAction {
225        error!(
226            subject_id = %self.subject_id,
227            request_id = %self.request_id,
228            distribution_type = ?self.distribution_type,
229            error = %error,
230            "Child fault in distribution actor"
231        );
232        emit_fail(ctx, error).await;
233        ChildAction::Stop
234    }
235}