1use std::{collections::HashSet, sync::Arc};
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use tracing::{Span, debug, error, info_span};
10
11use crate::{
12 distribution::coordinator::{DistriCoordinator, DistriCoordinatorMessage},
13 helpers::network::service::NetworkSender,
14 metrics::try_core_metrics,
15 model::common::emit_fail,
16 request::manager::{RequestManager, RequestManagerMessage},
17 subject::SignedLedger,
18};
19
20pub mod coordinator;
21pub mod error;
22pub mod worker;
23
24#[derive(Debug, Clone)]
25pub enum DistributionType {
26 Manual,
27 Request,
28}
29
30pub struct Distribution {
31 network: Arc<NetworkSender>,
32 witnesses: HashSet<PublicKey>,
33 distribution_type: DistributionType,
34 subject_id: DigestIdentifier,
35 request_id: DigestIdentifier,
36}
37
38impl Distribution {
39 fn observe_event(result: &'static str) {
40 if let Some(metrics) = try_core_metrics() {
41 metrics.observe_protocol_event("distribution", result);
42 }
43 }
44
45 pub fn new(
46 network: Arc<NetworkSender>,
47 distribution_type: DistributionType,
48 request_id: DigestIdentifier,
49 ) -> Self {
50 Self {
51 request_id,
52 network,
53 distribution_type,
54 witnesses: HashSet::new(),
55 subject_id: DigestIdentifier::default(),
56 }
57 }
58
59 fn check_witness(&mut self, witness: PublicKey) -> bool {
60 self.witnesses.remove(&witness)
61 }
62
63 async fn create_distributors(
64 &self,
65 ctx: &mut ActorContext<Self>,
66 ledger: SignedLedger,
67 signer: PublicKey,
68 ) -> Result<(), ActorError> {
69 let child = ctx
70 .create_child(
71 &format!("{}", signer),
72 DistriCoordinator {
73 node_key: signer.clone(),
74 network: self.network.clone(),
75 },
76 )
77 .await;
78 let distributor_actor = match child {
79 Ok(child) => child,
80 Err(e) => {
81 error!(
82 subject_id = %self.subject_id,
83 witness = %signer,
84 error = %e,
85 "Failed to create distributor coordinator"
86 );
87 return Err(e);
88 }
89 };
90
91 let request_id = match self.distribution_type {
92 DistributionType::Manual => {
93 format!("node/manual_distribution/{}", self.subject_id)
94 }
95 DistributionType::Request => {
96 format!("request/{}/distribution", self.subject_id)
97 }
98 };
99
100 distributor_actor
101 .tell(DistriCoordinatorMessage::NetworkDistribution {
102 request_id,
103 ledger: Box::new(ledger),
104 })
105 .await
106 }
107
108 async fn end_request(
109 &self,
110 ctx: &ActorContext<Self>,
111 ) -> Result<(), ActorError> {
112 if matches!(self.distribution_type, DistributionType::Request) {
113 let req_actor = ctx.get_parent::<RequestManager>().await?;
114 req_actor
115 .tell(RequestManagerMessage::FinishRequest {
116 request_id: self.request_id.clone(),
117 })
118 .await?;
119 } else {
120 ctx.stop(None).await;
121 }
122
123 Ok(())
124 }
125}
126
127#[async_trait]
128impl Actor for Distribution {
129 type Event = ();
130 type Message = DistributionMessage;
131 type Response = ();
132
133 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
134 parent_span.map_or_else(
135 || info_span!("Distribution", id),
136 |parent_span| info_span!(parent: parent_span, "Distribution", id),
137 )
138 }
139}
140
141#[derive(Debug, Clone)]
142pub enum DistributionMessage {
143 Create {
144 ledger: Box<SignedLedger>,
145 witnesses: HashSet<PublicKey>,
146 },
147 Response {
148 sender: PublicKey,
149 },
150}
151
152impl Message for DistributionMessage {}
153
154impl NotPersistentActor for Distribution {}
155
156#[async_trait]
157impl Handler<Self> for Distribution {
158 async fn handle_message(
159 &mut self,
160 _sender: ActorPath,
161 msg: DistributionMessage,
162 ctx: &mut ActorContext<Self>,
163 ) -> Result<(), ActorError> {
164 match msg {
165 DistributionMessage::Create { ledger, witnesses } => {
166 self.witnesses.clone_from(&witnesses);
167 self.subject_id = ledger.content().get_subject_id();
168
169 debug!(
170 msg_type = "Create",
171 subject_id = %self.subject_id,
172 witnesses_count = witnesses.len(),
173 distribution_type = ?self.distribution_type,
174 "Starting distribution to witnesses"
175 );
176
177 for witness in witnesses.iter() {
178 self.create_distributors(
179 ctx,
180 *ledger.clone(),
181 witness.clone(),
182 )
183 .await?
184 }
185
186 debug!(
187 msg_type = "Create",
188 subject_id = %self.subject_id,
189 "All distributor coordinators created"
190 );
191 }
192 DistributionMessage::Response { sender } => {
193 debug!(
194 msg_type = "Response",
195 subject_id = %self.subject_id,
196 sender = %sender,
197 remaining_witnesses = self.witnesses.len(),
198 "Distribution response received"
199 );
200
201 if self.check_witness(sender.clone())
202 && self.witnesses.is_empty()
203 {
204 Self::observe_event("success");
205 debug!(
206 msg_type = "Response",
207 subject_id = %self.subject_id,
208 "All witnesses responded, ending distribution"
209 );
210
211 if let Err(e) = self.end_request(ctx).await {
212 error!(
213 msg_type = "Response",
214 subject_id = %self.subject_id,
215 request_id = %self.request_id,
216 error = %e,
217 "Failed to end distribution request"
218 );
219 return Err(emit_fail(ctx, e).await);
220 };
221 }
222 }
223 }
224
225 Ok(())
226 }
227
228 async fn on_child_fault(
229 &mut self,
230 error: ActorError,
231 ctx: &mut ActorContext<Self>,
232 ) -> ChildAction {
233 Self::observe_event("error");
234 error!(
235 subject_id = %self.subject_id,
236 request_id = %self.request_id,
237 distribution_type = ?self.distribution_type,
238 error = %error,
239 "Child fault in distribution actor"
240 );
241 emit_fail(ctx, error).await;
242 ChildAction::Stop
243 }
244}