ave_core/manual_distribution/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 identity::{DigestIdentifier, PublicKey},
10 request::EventRequest,
11 schematype::ReservedWords,
12};
13use serde::{Deserialize, Serialize};
14use tracing::{Span, debug, error, info_span, warn};
15
16use crate::{
17 distribution::{Distribution, DistributionMessage, DistributionType},
18 governance::{
19 data::GovernanceData,
20 model::{RoleTypes, WitnessesData},
21 },
22 helpers::network::service::NetworkSender,
23 model::common::{
24 emit_fail,
25 node::i_can_send_last_ledger,
26 subject::{acquire_subject, get_gov, get_last_ledger_event},
27 },
28 request::types::{DistributionPlanEntry, DistributionPlanMode},
29};
30
31pub struct ManualDistribution {
32 our_key: Arc<PublicKey>,
33}
34
35impl ManualDistribution {
36 pub const fn new(our_key: Arc<PublicKey>) -> Self {
37 Self { our_key }
38 }
39
40 fn tracker_fact_mode_for_creator(
41 governance_data: &GovernanceData,
42 schema_id: &ave_common::SchemaType,
43 namespace: &ave_common::Namespace,
44 creator: &PublicKey,
45 witness: &PublicKey,
46 viewpoints: &std::collections::BTreeSet<String>,
47 ) -> DistributionPlanMode {
48 let Some(witness_name) = governance_data
49 .members
50 .iter()
51 .find(|(_, key)| *key == witness)
52 .map(|(name, _)| name.clone())
53 else {
54 return DistributionPlanMode::Opaque;
55 };
56
57 let Some(creator_name) = governance_data
58 .members
59 .iter()
60 .find(|(_, key)| *key == creator)
61 .map(|(name, _)| name.clone())
62 else {
63 return DistributionPlanMode::Opaque;
64 };
65
66 let Some(roles_schema) = governance_data.roles_schema.get(schema_id)
67 else {
68 return DistributionPlanMode::Opaque;
69 };
70
71 let Some(role_creator) = roles_schema.creator.get(
72 &ave_common::governance::RoleCreator::create(
73 &creator_name,
74 namespace.clone(),
75 ),
76 ) else {
77 return DistributionPlanMode::Opaque;
78 };
79
80 let is_generic_witness =
81 roles_schema.hash_this_rol(
82 RoleTypes::Witness,
83 namespace.clone(),
84 &witness_name,
85 ) || governance_data.roles_tracker_schemas.hash_this_rol(
86 RoleTypes::Witness,
87 namespace.clone(),
88 &witness_name,
89 );
90
91 let allows_clear =
92 role_creator.witnesses.iter().any(|creator_witness| {
93 let applies = creator_witness.name == witness_name
94 || (creator_witness.name
95 == ReservedWords::Witnesses.to_string()
96 && is_generic_witness);
97
98 if !applies {
99 return false;
100 }
101
102 creator_witness
103 .viewpoints
104 .contains(&ReservedWords::AllViewpoints.to_string())
105 || viewpoints.is_empty()
106 || viewpoints.is_subset(&creator_witness.viewpoints)
107 });
108
109 if allows_clear {
110 DistributionPlanMode::Clear
111 } else {
112 DistributionPlanMode::Opaque
113 }
114 }
115
116 fn build_tracker_manual_plan(
117 governance_data: &GovernanceData,
118 schema_id: ave_common::SchemaType,
119 namespace: ave_common::Namespace,
120 event_request: &EventRequest,
121 signer: &PublicKey,
122 ) -> Result<Vec<DistributionPlanEntry>, ActorError> {
123 let witnesses = governance_data
124 .get_witnesses(WitnessesData::Schema {
125 creator: signer.clone(),
126 schema_id: schema_id.clone(),
127 namespace: namespace.clone(),
128 })
129 .map_err(|e| ActorError::Functional {
130 description: e.to_string(),
131 })?;
132
133 Ok(witnesses
134 .into_iter()
135 .map(|node| {
136 let mode = match event_request {
137 EventRequest::Fact(fact_request) => {
138 Self::tracker_fact_mode_for_creator(
139 governance_data,
140 &schema_id,
141 &namespace,
142 signer,
143 &node,
144 &fact_request.viewpoints,
145 )
146 }
147 _ => DistributionPlanMode::Clear,
148 };
149
150 DistributionPlanEntry { node, mode }
151 })
152 .collect())
153 }
154}
155
156#[derive(Clone, Debug, Serialize, Deserialize)]
157pub enum ManualDistributionMessage {
158 Update(DigestIdentifier),
159}
160
161impl Message for ManualDistributionMessage {}
162
163impl NotPersistentActor for ManualDistribution {}
164
165#[async_trait]
166impl Actor for ManualDistribution {
167 type Message = ManualDistributionMessage;
168 type Event = ();
169 type Response = ();
170
171 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
172 parent_span.map_or_else(
173 || info_span!("ManualDistribution"),
174 |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
175 )
176 }
177}
178
179#[async_trait]
180impl Handler<Self> for ManualDistribution {
181 async fn handle_message(
182 &mut self,
183 _sender: ActorPath,
184 msg: ManualDistributionMessage,
185 ctx: &mut ave_actors::ActorContext<Self>,
186 ) -> Result<(), ActorError> {
187 match msg {
188 ManualDistributionMessage::Update(subject_id) => {
189 let data = i_can_send_last_ledger(ctx, &subject_id)
190 .await
191 .map_err(|e| {
192 error!(
193 msg_type = "Update",
194 subject_id = %subject_id,
195 error = %e,
196 "Failed to check if we can send last ledger"
197 );
198 e
199 })?;
200
201 let Some(data) = data else {
202 warn!(
203 msg_type = "Update",
204 subject_id = %subject_id,
205 "Not the owner of the subject nor rejected transfer"
206 );
207 return Err(ActorError::Functional {
208 description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
209 });
210 };
211
212 let is_gov = data.get_schema_id().is_gov();
213
214 let ledger = if is_gov {
215 get_last_ledger_event(ctx, &subject_id).await
216 } else {
217 let lease = acquire_subject(
218 ctx,
219 &subject_id,
220 format!("manual_distribution:{}", subject_id),
221 None,
222 true,
223 )
224 .await?;
225
226 let ledger = get_last_ledger_event(ctx, &subject_id).await;
227 lease.finish(ctx).await?;
228 ledger
229 };
230
231 let ledger = ledger.map_err(|e| {
232 error!(
233 msg_type = "Update",
234 subject_id = %subject_id,
235 error = %e,
236 "Failed to get last ledger event"
237 );
238 e
239 })?;
240
241 let Some(ledger) = ledger else {
242 error!(
243 msg_type = "Update",
244 subject_id = %subject_id,
245 "No ledger event found for subject"
246 );
247 return Err(ActorError::Functional {
248 description: "Cannot obtain last ledger event"
249 .to_string(),
250 });
251 };
252
253 let governance_id =
254 data.get_governance_id().as_ref().map_or_else(
255 || subject_id.clone(),
256 |governance_id| governance_id.clone(),
257 );
258
259 let schema_id = data.get_schema_id();
260 let recipients = if is_gov {
261 let gov =
262 get_gov(ctx, &governance_id).await.map_err(|e| {
263 error!(
264 msg_type = "Update",
265 subject_id = %subject_id,
266 governance_id = %governance_id,
267 error = %e,
268 "Failed to get governance"
269 );
270 e
271 })?;
272
273 let mut witnesses =
274 gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
275 error!(
276 msg_type = "Update",
277 subject_id = %subject_id,
278 is_gov = is_gov,
279 error = %e,
280 "Failed to get witnesses from governance"
281 );
282 ActorError::Functional {
283 description: e.to_string(),
284 }
285 })?;
286 witnesses.remove(&*self.our_key);
287 witnesses
288 .into_iter()
289 .map(|node| DistributionPlanEntry {
290 node,
291 mode: DistributionPlanMode::Clear,
292 })
293 .collect::<Vec<_>>()
294 } else {
295 let gov =
296 get_gov(ctx, &governance_id).await.map_err(|e| {
297 error!(
298 msg_type = "Update",
299 subject_id = %subject_id,
300 governance_id = %governance_id,
301 error = %e,
302 "Failed to get governance"
303 );
304 e
305 })?;
306
307 let Some(event_request) = ledger.get_event_request() else {
308 return Err(ActorError::Functional {
309 description:
310 "Missing event request in tracker ledger"
311 .to_owned(),
312 });
313 };
314
315 Self::build_tracker_manual_plan(
316 &gov,
317 schema_id.clone(),
318 ave_common::Namespace::from(data.get_namespace()),
319 &event_request,
320 &ledger.ledger_seal_signature.signer,
321 )?
322 .into_iter()
323 .filter(|entry| entry.node != *self.our_key)
324 .collect::<Vec<_>>()
325 };
326
327 if recipients.is_empty() {
328 warn!(
329 msg_type = "Update",
330 subject_id = %subject_id,
331 "No witnesses available for manual distribution"
332 );
333 return Err(ActorError::Functional {
334 description: "No witnesses available to manually send the last ledger event".to_string()
335 });
336 }
337
338 let witnesses_count = recipients.len();
339
340 let Some(network) = ctx
341 .system()
342 .get_helper::<Arc<NetworkSender>>("network")
343 .await
344 else {
345 error!(
346 msg_type = "Update",
347 subject_id = %subject_id,
348 "Network helper not found"
349 );
350 return Err(ActorError::Helper {
351 name: "network".to_owned(),
352 reason: "Not found".to_owned(),
353 });
354 };
355
356 let distribution = Distribution::new(
357 network,
358 DistributionType::Manual,
359 DigestIdentifier::default(),
360 );
361
362 let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
363 warn!(
364 msg_type = "Update",
365 subject_id = %subject_id,
366 error = %e,
367 "Manual distribution already in progress"
368 );
369 ActorError::Functional {
370 description: "Manual distribution already in progress for this subject".to_owned()
371 }
372 })?;
373
374 if let Err(e) = distribution_actor
375 .tell(DistributionMessage::Create {
376 distribution_plan: recipients,
377 ledger: Box::new(ledger),
378 })
379 .await
380 {
381 error!(
382 msg_type = "Update",
383 subject_id = %subject_id,
384 witnesses_count = witnesses_count,
385 error = %e,
386 "Failed to start manual distribution"
387 );
388 return Err(ActorError::Functional {
389 description: format!(
390 "Failed to start manual distribution: {}",
391 e
392 ),
393 });
394 };
395
396 debug!(
397 msg_type = "Update",
398 subject_id = %subject_id,
399 witnesses_count = witnesses_count,
400 is_gov = is_gov,
401 "Manual distribution started successfully"
402 );
403
404 Ok(())
405 }
406 }
407 }
408
409 async fn on_child_fault(
410 &mut self,
411 error: ActorError,
412 ctx: &mut ActorContext<Self>,
413 ) -> ChildAction {
414 error!(
415 error = %error,
416 "Child actor fault in manual distribution"
417 );
418 emit_fail(ctx, error).await;
419 ChildAction::Stop
420 }
421}