1use std::{collections::HashSet, sync::Arc};
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use tracing::{Span, debug, error, info_span};
10
11use crate::{
12 distribution::coordinator::{DistriCoordinator, DistriCoordinatorMessage},
13 helpers::network::service::NetworkSender,
14 model::common::emit_fail,
15 request::manager::{RequestManager, RequestManagerMessage},
16 subject::SignedLedger,
17};
18
19pub mod coordinator;
20pub mod error;
21pub mod worker;
22
23#[derive(Debug, Clone)]
24pub enum DistributionType {
25 Manual,
26 Request,
27}
28
29pub struct Distribution {
30 network: Arc<NetworkSender>,
31 witnesses: HashSet<PublicKey>,
32 distribution_type: DistributionType,
33 subject_id: DigestIdentifier,
34 request_id: DigestIdentifier,
35}
36
37impl Distribution {
38 pub fn new(
39 network: Arc<NetworkSender>,
40 distribution_type: DistributionType,
41 request_id: DigestIdentifier,
42 ) -> Self {
43 Self {
44 request_id,
45 network,
46 distribution_type,
47 witnesses: HashSet::new(),
48 subject_id: DigestIdentifier::default(),
49 }
50 }
51
52 fn check_witness(&mut self, witness: PublicKey) -> bool {
53 self.witnesses.remove(&witness)
54 }
55
56 async fn create_distributors(
57 &self,
58 ctx: &mut ActorContext<Self>,
59 ledger: SignedLedger,
60 signer: PublicKey,
61 ) -> Result<(), ActorError> {
62 let child = ctx
63 .create_child(
64 &format!("{}", signer),
65 DistriCoordinator {
66 node_key: signer.clone(),
67 network: self.network.clone(),
68 },
69 )
70 .await;
71 let distributor_actor = match child {
72 Ok(child) => child,
73 Err(e) => {
74 error!(
75 subject_id = %self.subject_id,
76 witness = %signer,
77 error = %e,
78 "Failed to create distributor coordinator"
79 );
80 return Err(e);
81 }
82 };
83
84 let request_id = match self.distribution_type {
85 DistributionType::Manual => {
86 format!("node/manual_distribution/{}", self.subject_id)
87 }
88 DistributionType::Request => {
89 format!("request/{}/distribution", self.subject_id)
90 }
91 };
92
93 distributor_actor
94 .tell(DistriCoordinatorMessage::NetworkDistribution {
95 request_id,
96 ledger: Box::new(ledger),
97 })
98 .await
99 }
100
101 async fn end_request(
102 &self,
103 ctx: &ActorContext<Self>,
104 ) -> Result<(), ActorError> {
105 if matches!(self.distribution_type, DistributionType::Request) {
106 let req_actor = ctx.get_parent::<RequestManager>().await?;
107 req_actor
108 .tell(RequestManagerMessage::FinishRequest {
109 request_id: self.request_id.clone(),
110 })
111 .await?;
112 } else {
113 ctx.stop(None).await;
114 }
115
116 Ok(())
117 }
118}
119
120#[async_trait]
121impl Actor for Distribution {
122 type Event = ();
123 type Message = DistributionMessage;
124 type Response = ();
125
126 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
127 parent_span.map_or_else(
128 || info_span!("Distribution", id),
129 |parent_span| info_span!(parent: parent_span, "Distribution", id),
130 )
131 }
132}
133
134#[derive(Debug, Clone)]
135pub enum DistributionMessage {
136 Create {
137 ledger: Box<SignedLedger>,
138 witnesses: HashSet<PublicKey>,
139 },
140 Response {
141 sender: PublicKey,
142 },
143}
144
145impl Message for DistributionMessage {}
146
147impl NotPersistentActor for Distribution {}
148
149#[async_trait]
150impl Handler<Self> for Distribution {
151 async fn handle_message(
152 &mut self,
153 _sender: ActorPath,
154 msg: DistributionMessage,
155 ctx: &mut ActorContext<Self>,
156 ) -> Result<(), ActorError> {
157 match msg {
158 DistributionMessage::Create { ledger, witnesses } => {
159 self.witnesses.clone_from(&witnesses);
160 self.subject_id = ledger.content().get_subject_id();
161
162 debug!(
163 msg_type = "Create",
164 subject_id = %self.subject_id,
165 witnesses_count = witnesses.len(),
166 distribution_type = ?self.distribution_type,
167 "Starting distribution to witnesses"
168 );
169
170 for witness in witnesses.iter() {
171 self.create_distributors(
172 ctx,
173 *ledger.clone(),
174 witness.clone(),
175 )
176 .await?
177 }
178
179 debug!(
180 msg_type = "Create",
181 subject_id = %self.subject_id,
182 "All distributor coordinators created"
183 );
184 }
185 DistributionMessage::Response { sender } => {
186 debug!(
187 msg_type = "Response",
188 subject_id = %self.subject_id,
189 sender = %sender,
190 remaining_witnesses = self.witnesses.len(),
191 "Distribution response received"
192 );
193
194 if self.check_witness(sender.clone())
195 && self.witnesses.is_empty()
196 {
197 debug!(
198 msg_type = "Response",
199 subject_id = %self.subject_id,
200 "All witnesses responded, ending distribution"
201 );
202
203 if let Err(e) = self.end_request(ctx).await {
204 error!(
205 msg_type = "Response",
206 subject_id = %self.subject_id,
207 request_id = %self.request_id,
208 error = %e,
209 "Failed to end distribution request"
210 );
211 return Err(emit_fail(ctx, e).await);
212 };
213 }
214 }
215 }
216
217 Ok(())
218 }
219
220 async fn on_child_fault(
221 &mut self,
222 error: ActorError,
223 ctx: &mut ActorContext<Self>,
224 ) -> ChildAction {
225 error!(
226 subject_id = %self.subject_id,
227 request_id = %self.request_id,
228 distribution_type = ?self.distribution_type,
229 error = %error,
230 "Child fault in distribution actor"
231 );
232 emit_fail(ctx, error).await;
233 ChildAction::Stop
234 }
235}