1use std::{collections::HashSet, sync::Arc};
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use tracing::{Span, debug, error, info_span, warn};
10
11use crate::{
12 distribution::coordinator::{DistriCoordinator, DistriCoordinatorMessage},
13 helpers::network::service::NetworkSender,
14 metrics::try_core_metrics,
15 model::{common::emit_fail, event::Ledger},
16 request::manager::{RequestManager, RequestManagerMessage},
17 request::types::{DistributionPlanEntry, DistributionPlanMode},
18};
19
20pub mod coordinator;
21pub mod error;
22pub mod worker;
23
24#[derive(Debug, Clone)]
25pub enum DistributionType {
26 Manual,
27 Request,
28}
29
30pub struct Distribution {
31 network: Arc<NetworkSender>,
32 witnesses: HashSet<PublicKey>,
33 distribution_type: DistributionType,
34 subject_id: DigestIdentifier,
35 request_id: DigestIdentifier,
36}
37
38impl Distribution {
39 fn observe_event(result: &'static str) {
40 if let Some(metrics) = try_core_metrics() {
41 metrics.observe_protocol_event("distribution", result);
42 }
43 }
44
45 pub fn new(
46 network: Arc<NetworkSender>,
47 distribution_type: DistributionType,
48 request_id: DigestIdentifier,
49 ) -> Self {
50 Self {
51 request_id,
52 network,
53 distribution_type,
54 witnesses: HashSet::new(),
55 subject_id: DigestIdentifier::default(),
56 }
57 }
58
59 fn check_witness(&mut self, witness: PublicKey) -> bool {
60 self.witnesses.remove(&witness)
61 }
62
63 fn project_ledger_for_mode(
64 ledger: &Ledger,
65 mode: &DistributionPlanMode,
66 ) -> Result<Ledger, ActorError> {
67 match mode {
68 DistributionPlanMode::Clear => Ok(ledger.clone()),
69 DistributionPlanMode::Opaque => {
70 ledger.to_tracker_opaque().map_err(Into::into)
71 }
72 }
73 }
74
75 async fn create_distributor(
76 &self,
77 ctx: &mut ActorContext<Self>,
78 ledger: Ledger,
79 signer: PublicKey,
80 ) -> Result<(), ActorError> {
81 let child = ctx
82 .create_child(
83 &format!("{}", signer),
84 DistriCoordinator {
85 node_key: signer.clone(),
86 network: self.network.clone(),
87 },
88 )
89 .await;
90 let distributor_actor = match child {
91 Ok(child) => child,
92 Err(e) => {
93 error!(
94 subject_id = %self.subject_id,
95 witness = %signer,
96 error = %e,
97 "Failed to create distributor coordinator"
98 );
99 return Err(e);
100 }
101 };
102
103 let request_id = match self.distribution_type {
104 DistributionType::Manual => {
105 format!("node/manual_distribution/{}", self.subject_id)
106 }
107 DistributionType::Request => {
108 format!("request/{}/distribution", self.subject_id)
109 }
110 };
111
112 distributor_actor
113 .tell(DistriCoordinatorMessage::NetworkDistribution {
114 request_id,
115 ledger: Box::new(ledger),
116 })
117 .await
118 }
119
120 async fn end_request(
121 &self,
122 ctx: &ActorContext<Self>,
123 ) -> Result<(), ActorError> {
124 if matches!(self.distribution_type, DistributionType::Request) {
125 let req_actor = ctx.get_parent::<RequestManager>().await?;
126 req_actor
127 .tell(RequestManagerMessage::FinishRequest {
128 request_id: self.request_id.clone(),
129 })
130 .await?;
131 } else {
132 ctx.stop(None).await;
133 }
134
135 Ok(())
136 }
137}
138
139#[async_trait]
140impl Actor for Distribution {
141 type Event = ();
142 type Message = DistributionMessage;
143 type Response = ();
144
145 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
146 parent_span.map_or_else(
147 || info_span!("Distribution", id),
148 |parent_span| info_span!(parent: parent_span, "Distribution", id),
149 )
150 }
151}
152
153#[derive(Debug, Clone)]
154pub enum DistributionMessage {
155 Create {
156 ledger: Box<Ledger>,
157 distribution_plan: Vec<DistributionPlanEntry>,
158 },
159 Response {
160 sender: PublicKey,
161 },
162}
163
164impl Message for DistributionMessage {}
165
166impl NotPersistentActor for Distribution {}
167
168#[async_trait]
169impl Handler<Self> for Distribution {
170 async fn handle_message(
171 &mut self,
172 _sender: ActorPath,
173 msg: DistributionMessage,
174 ctx: &mut ActorContext<Self>,
175 ) -> Result<(), ActorError> {
176 match msg {
177 DistributionMessage::Create {
178 ledger,
179 distribution_plan,
180 } => {
181 self.witnesses = distribution_plan
182 .iter()
183 .map(|entry| entry.node.clone())
184 .collect();
185 self.subject_id = ledger.get_subject_id();
186 let clear_ledger = (*ledger).clone();
187 let opaque_ledger = if distribution_plan.iter().any(|entry| {
188 matches!(entry.mode, DistributionPlanMode::Opaque)
189 }) {
190 Some(Self::project_ledger_for_mode(
191 &clear_ledger,
192 &DistributionPlanMode::Opaque,
193 )?)
194 } else {
195 None
196 };
197
198 debug!(
199 msg_type = "Create",
200 subject_id = %self.subject_id,
201 witnesses_count = distribution_plan.len(),
202 distribution_type = ?self.distribution_type,
203 "Starting distribution to witnesses"
204 );
205
206 for entry in distribution_plan {
207 let ledger = match entry.mode {
208 DistributionPlanMode::Clear => clear_ledger.clone(),
209 DistributionPlanMode::Opaque => opaque_ledger
210 .clone()
211 .ok_or_else(|| ActorError::FunctionalCritical {
212 description: format!(
213 "Missing opaque distribution projection for subject {}",
214 self.subject_id
215 ),
216 })?,
217 };
218
219 self.create_distributor(ctx, ledger, entry.node).await?
220 }
221
222 debug!(
223 msg_type = "Create",
224 subject_id = %self.subject_id,
225 "All distributor coordinators created"
226 );
227 }
228 DistributionMessage::Response { sender } => {
229 let removed = self.check_witness(sender.clone());
230 let remaining_witnesses = self.witnesses.len();
231
232 if !removed {
233 warn!(
234 msg_type = "Response",
235 subject_id = %self.subject_id,
236 sender = %sender,
237 remaining_witnesses = remaining_witnesses,
238 "Ignoring response from unexpected or already-processed witness"
239 );
240 return Ok(());
241 }
242
243 debug!(
244 msg_type = "Response",
245 subject_id = %self.subject_id,
246 sender = %sender,
247 remaining_witnesses = remaining_witnesses,
248 "Distribution response received"
249 );
250
251 if remaining_witnesses == 0 {
252 Self::observe_event("success");
253 debug!(
254 msg_type = "Response",
255 subject_id = %self.subject_id,
256 "All witnesses responded, ending distribution"
257 );
258
259 if let Err(e) = self.end_request(ctx).await {
260 error!(
261 msg_type = "Response",
262 subject_id = %self.subject_id,
263 request_id = %self.request_id,
264 error = %e,
265 "Failed to end distribution request"
266 );
267 return Err(emit_fail(ctx, e).await);
268 };
269 }
270 }
271 }
272
273 Ok(())
274 }
275
276 async fn on_child_fault(
277 &mut self,
278 error: ActorError,
279 ctx: &mut ActorContext<Self>,
280 ) -> ChildAction {
281 Self::observe_event("error");
282 error!(
283 subject_id = %self.subject_id,
284 request_id = %self.request_id,
285 distribution_type = ?self.distribution_type,
286 error = %error,
287 "Child fault in distribution actor"
288 );
289 emit_fail(ctx, error).await;
290 ChildAction::Stop
291 }
292}