Skip to main content

ave_core/distribution/
mod.rs

1use std::{collections::HashSet, sync::Arc};
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use tracing::{Span, debug, error, info_span, warn};
10
11use crate::{
12    distribution::coordinator::{DistriCoordinator, DistriCoordinatorMessage},
13    helpers::network::service::NetworkSender,
14    metrics::try_core_metrics,
15    model::{common::emit_fail, event::Ledger},
16    request::manager::{RequestManager, RequestManagerMessage},
17    request::types::{DistributionPlanEntry, DistributionPlanMode},
18};
19
20pub mod coordinator;
21pub mod error;
22pub mod worker;
23
24#[derive(Debug, Clone)]
25pub enum DistributionType {
26    Manual,
27    Request,
28}
29
30pub struct Distribution {
31    network: Arc<NetworkSender>,
32    witnesses: HashSet<PublicKey>,
33    distribution_type: DistributionType,
34    subject_id: DigestIdentifier,
35    request_id: DigestIdentifier,
36}
37
38impl Distribution {
39    fn observe_event(result: &'static str) {
40        if let Some(metrics) = try_core_metrics() {
41            metrics.observe_protocol_event("distribution", result);
42        }
43    }
44
45    pub fn new(
46        network: Arc<NetworkSender>,
47        distribution_type: DistributionType,
48        request_id: DigestIdentifier,
49    ) -> Self {
50        Self {
51            request_id,
52            network,
53            distribution_type,
54            witnesses: HashSet::new(),
55            subject_id: DigestIdentifier::default(),
56        }
57    }
58
59    fn check_witness(&mut self, witness: PublicKey) -> bool {
60        self.witnesses.remove(&witness)
61    }
62
63    fn project_ledger_for_mode(
64        ledger: &Ledger,
65        mode: &DistributionPlanMode,
66    ) -> Result<Ledger, ActorError> {
67        match mode {
68            DistributionPlanMode::Clear => Ok(ledger.clone()),
69            DistributionPlanMode::Opaque => {
70                ledger.to_tracker_opaque().map_err(Into::into)
71            }
72        }
73    }
74
75    async fn create_distributor(
76        &self,
77        ctx: &mut ActorContext<Self>,
78        ledger: Ledger,
79        signer: PublicKey,
80    ) -> Result<(), ActorError> {
81        let child = ctx
82            .create_child(
83                &format!("{}", signer),
84                DistriCoordinator {
85                    node_key: signer.clone(),
86                    network: self.network.clone(),
87                },
88            )
89            .await;
90        let distributor_actor = match child {
91            Ok(child) => child,
92            Err(e) => {
93                error!(
94                    subject_id = %self.subject_id,
95                    witness = %signer,
96                    error = %e,
97                    "Failed to create distributor coordinator"
98                );
99                return Err(e);
100            }
101        };
102
103        let request_id = match self.distribution_type {
104            DistributionType::Manual => {
105                format!("node/manual_distribution/{}", self.subject_id)
106            }
107            DistributionType::Request => {
108                format!("request/{}/distribution", self.subject_id)
109            }
110        };
111
112        distributor_actor
113            .tell(DistriCoordinatorMessage::NetworkDistribution {
114                request_id,
115                ledger: Box::new(ledger),
116            })
117            .await
118    }
119
120    async fn end_request(
121        &self,
122        ctx: &ActorContext<Self>,
123    ) -> Result<(), ActorError> {
124        if matches!(self.distribution_type, DistributionType::Request) {
125            let req_actor = ctx.get_parent::<RequestManager>().await?;
126            req_actor
127                .tell(RequestManagerMessage::FinishRequest {
128                    request_id: self.request_id.clone(),
129                })
130                .await?;
131        } else {
132            ctx.stop(None).await;
133        }
134
135        Ok(())
136    }
137}
138
139#[async_trait]
140impl Actor for Distribution {
141    type Event = ();
142    type Message = DistributionMessage;
143    type Response = ();
144
145    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
146        parent_span.map_or_else(
147            || info_span!("Distribution", id),
148            |parent_span| info_span!(parent: parent_span, "Distribution", id),
149        )
150    }
151}
152
153#[derive(Debug, Clone)]
154pub enum DistributionMessage {
155    Create {
156        ledger: Box<Ledger>,
157        distribution_plan: Vec<DistributionPlanEntry>,
158    },
159    Response {
160        sender: PublicKey,
161    },
162}
163
164impl Message for DistributionMessage {}
165
166impl NotPersistentActor for Distribution {}
167
168#[async_trait]
169impl Handler<Self> for Distribution {
170    async fn handle_message(
171        &mut self,
172        _sender: ActorPath,
173        msg: DistributionMessage,
174        ctx: &mut ActorContext<Self>,
175    ) -> Result<(), ActorError> {
176        match msg {
177            DistributionMessage::Create {
178                ledger,
179                distribution_plan,
180            } => {
181                self.witnesses = distribution_plan
182                    .iter()
183                    .map(|entry| entry.node.clone())
184                    .collect();
185                self.subject_id = ledger.get_subject_id();
186                let clear_ledger = (*ledger).clone();
187                let opaque_ledger = if distribution_plan.iter().any(|entry| {
188                    matches!(entry.mode, DistributionPlanMode::Opaque)
189                }) {
190                    Some(Self::project_ledger_for_mode(
191                        &clear_ledger,
192                        &DistributionPlanMode::Opaque,
193                    )?)
194                } else {
195                    None
196                };
197
198                debug!(
199                    msg_type = "Create",
200                    subject_id = %self.subject_id,
201                    witnesses_count = distribution_plan.len(),
202                    distribution_type = ?self.distribution_type,
203                    "Starting distribution to witnesses"
204                );
205
206                for entry in distribution_plan {
207                    let ledger = match entry.mode {
208                        DistributionPlanMode::Clear => clear_ledger.clone(),
209                        DistributionPlanMode::Opaque => opaque_ledger
210                            .clone()
211                            .ok_or_else(|| ActorError::FunctionalCritical {
212                                description: format!(
213                                    "Missing opaque distribution projection for subject {}",
214                                    self.subject_id
215                                ),
216                            })?,
217                    };
218
219                    self.create_distributor(ctx, ledger, entry.node).await?
220                }
221
222                debug!(
223                    msg_type = "Create",
224                    subject_id = %self.subject_id,
225                    "All distributor coordinators created"
226                );
227            }
228            DistributionMessage::Response { sender } => {
229                let removed = self.check_witness(sender.clone());
230                let remaining_witnesses = self.witnesses.len();
231
232                if !removed {
233                    warn!(
234                        msg_type = "Response",
235                        subject_id = %self.subject_id,
236                        sender = %sender,
237                        remaining_witnesses = remaining_witnesses,
238                        "Ignoring response from unexpected or already-processed witness"
239                    );
240                    return Ok(());
241                }
242
243                debug!(
244                    msg_type = "Response",
245                    subject_id = %self.subject_id,
246                    sender = %sender,
247                    remaining_witnesses = remaining_witnesses,
248                    "Distribution response received"
249                );
250
251                if remaining_witnesses == 0 {
252                    Self::observe_event("success");
253                    debug!(
254                        msg_type = "Response",
255                        subject_id = %self.subject_id,
256                        "All witnesses responded, ending distribution"
257                    );
258
259                    if let Err(e) = self.end_request(ctx).await {
260                        error!(
261                            msg_type = "Response",
262                            subject_id = %self.subject_id,
263                            request_id = %self.request_id,
264                            error = %e,
265                            "Failed to end distribution request"
266                        );
267                        return Err(emit_fail(ctx, e).await);
268                    };
269                }
270            }
271        }
272
273        Ok(())
274    }
275
276    async fn on_child_fault(
277        &mut self,
278        error: ActorError,
279        ctx: &mut ActorContext<Self>,
280    ) -> ChildAction {
281        Self::observe_event("error");
282        error!(
283            subject_id = %self.subject_id,
284            request_id = %self.request_id,
285            distribution_type = ?self.distribution_type,
286            error = %error,
287            "Child fault in distribution actor"
288        );
289        emit_fail(ctx, error).await;
290        ChildAction::Stop
291    }
292}