ave_core/manual_distribution/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 identity::{DigestIdentifier, PublicKey},
10 request::EventRequest,
11 schematype::ReservedWords,
12};
13use serde::{Deserialize, Serialize};
14use tracing::{Span, debug, error, info_span, warn};
15
16use crate::{
17 distribution::{Distribution, DistributionMessage, DistributionType},
18 governance::{
19 data::GovernanceData,
20 model::{RoleTypes, WitnessesData},
21 },
22 helpers::network::service::NetworkSender,
23 model::common::{
24 emit_fail,
25 node::i_can_send_last_ledger,
26 subject::{
27 acquire_subject, get_gov, get_last_ledger_event,
28 },
29 },
30 request::types::{DistributionPlanEntry, DistributionPlanMode},
31};
32
33pub struct ManualDistribution {
34 our_key: Arc<PublicKey>,
35}
36
37impl ManualDistribution {
38 pub const fn new(our_key: Arc<PublicKey>) -> Self {
39 Self { our_key }
40 }
41
42 fn tracker_fact_mode_for_creator(
43 governance_data: &GovernanceData,
44 schema_id: &ave_common::SchemaType,
45 namespace: &ave_common::Namespace,
46 creator: &PublicKey,
47 witness: &PublicKey,
48 viewpoints: &std::collections::BTreeSet<String>,
49 ) -> DistributionPlanMode {
50 let Some(witness_name) = governance_data
51 .members
52 .iter()
53 .find(|(_, key)| *key == witness)
54 .map(|(name, _)| name.clone())
55 else {
56 return DistributionPlanMode::Opaque;
57 };
58
59 let Some(creator_name) = governance_data
60 .members
61 .iter()
62 .find(|(_, key)| *key == creator)
63 .map(|(name, _)| name.clone())
64 else {
65 return DistributionPlanMode::Opaque;
66 };
67
68 let Some(roles_schema) = governance_data.roles_schema.get(schema_id)
69 else {
70 return DistributionPlanMode::Opaque;
71 };
72
73 let Some(role_creator) =
74 roles_schema
75 .creator
76 .get(&ave_common::governance::RoleCreator::create(
77 &creator_name,
78 namespace.clone(),
79 ))
80 else {
81 return DistributionPlanMode::Opaque;
82 };
83
84 let is_generic_witness =
85 roles_schema.hash_this_rol(
86 RoleTypes::Witness,
87 namespace.clone(),
88 &witness_name,
89 ) || governance_data.roles_tracker_schemas.hash_this_rol(
90 RoleTypes::Witness,
91 namespace.clone(),
92 &witness_name,
93 );
94
95 let allows_clear =
96 role_creator.witnesses.iter().any(|creator_witness| {
97 let applies = creator_witness.name == witness_name
98 || (creator_witness.name
99 == ReservedWords::Witnesses.to_string()
100 && is_generic_witness);
101
102 if !applies {
103 return false;
104 }
105
106 creator_witness
107 .viewpoints
108 .contains(&ReservedWords::AllViewpoints.to_string())
109 || viewpoints.is_empty()
110 || viewpoints.is_subset(&creator_witness.viewpoints)
111 });
112
113 if allows_clear {
114 DistributionPlanMode::Clear
115 } else {
116 DistributionPlanMode::Opaque
117 }
118 }
119
120 fn build_tracker_manual_plan(
121 governance_data: &GovernanceData,
122 schema_id: ave_common::SchemaType,
123 namespace: ave_common::Namespace,
124 event_request: &EventRequest,
125 signer: &PublicKey,
126 ) -> Result<Vec<DistributionPlanEntry>, ActorError> {
127 let witnesses = governance_data
128 .get_witnesses(WitnessesData::Schema {
129 creator: signer.clone(),
130 schema_id: schema_id.clone(),
131 namespace: namespace.clone(),
132 })
133 .map_err(|e| ActorError::Functional {
134 description: e.to_string(),
135 })?;
136
137 Ok(witnesses
138 .into_iter()
139 .map(|node| {
140 let mode = match event_request {
141 EventRequest::Fact(fact_request) => {
142 Self::tracker_fact_mode_for_creator(
143 governance_data,
144 &schema_id,
145 &namespace,
146 signer,
147 &node,
148 &fact_request.viewpoints,
149 )
150 }
151 _ => DistributionPlanMode::Clear,
152 };
153
154 DistributionPlanEntry { node, mode }
155 })
156 .collect())
157 }
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize)]
161pub enum ManualDistributionMessage {
162 Update(DigestIdentifier),
163}
164
165impl Message for ManualDistributionMessage {}
166
167impl NotPersistentActor for ManualDistribution {}
168
169#[async_trait]
170impl Actor for ManualDistribution {
171 type Message = ManualDistributionMessage;
172 type Event = ();
173 type Response = ();
174
175 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
176 parent_span.map_or_else(
177 || info_span!("ManualDistribution"),
178 |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
179 )
180 }
181}
182
183#[async_trait]
184impl Handler<Self> for ManualDistribution {
185 async fn handle_message(
186 &mut self,
187 _sender: ActorPath,
188 msg: ManualDistributionMessage,
189 ctx: &mut ave_actors::ActorContext<Self>,
190 ) -> Result<(), ActorError> {
191 match msg {
192 ManualDistributionMessage::Update(subject_id) => {
193 let data = i_can_send_last_ledger(ctx, &subject_id)
194 .await
195 .map_err(|e| {
196 error!(
197 msg_type = "Update",
198 subject_id = %subject_id,
199 error = %e,
200 "Failed to check if we can send last ledger"
201 );
202 e
203 })?;
204
205 let Some(data) = data else {
206 warn!(
207 msg_type = "Update",
208 subject_id = %subject_id,
209 "Not the owner of the subject nor rejected transfer"
210 );
211 return Err(ActorError::Functional {
212 description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
213 });
214 };
215
216 let is_gov = data.get_schema_id().is_gov();
217
218 let ledger = if is_gov {
219 get_last_ledger_event(ctx, &subject_id).await
220 } else {
221 let lease = acquire_subject(
222 ctx,
223 &subject_id,
224 format!("manual_distribution:{}", subject_id),
225 None,
226 true,
227 )
228 .await?;
229
230 let ledger = get_last_ledger_event(ctx, &subject_id).await;
231 lease.finish(ctx).await?;
232 ledger
233 };
234
235 let ledger = ledger.map_err(|e| {
236 error!(
237 msg_type = "Update",
238 subject_id = %subject_id,
239 error = %e,
240 "Failed to get last ledger event"
241 );
242 e
243 })?;
244
245 let Some(ledger) = ledger else {
246 error!(
247 msg_type = "Update",
248 subject_id = %subject_id,
249 "No ledger event found for subject"
250 );
251 return Err(ActorError::Functional {
252 description: "Cannot obtain last ledger event"
253 .to_string(),
254 });
255 };
256
257 let governance_id =
258 data.get_governance_id().as_ref().map_or_else(
259 || subject_id.clone(),
260 |governance_id| governance_id.clone(),
261 );
262
263 let schema_id = data.get_schema_id();
264 let recipients = if is_gov {
265 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
266 error!(
267 msg_type = "Update",
268 subject_id = %subject_id,
269 governance_id = %governance_id,
270 error = %e,
271 "Failed to get governance"
272 );
273 e
274 })?;
275
276 let mut witnesses =
277 gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
278 error!(
279 msg_type = "Update",
280 subject_id = %subject_id,
281 is_gov = is_gov,
282 error = %e,
283 "Failed to get witnesses from governance"
284 );
285 ActorError::Functional {
286 description: e.to_string(),
287 }
288 })?;
289 witnesses.remove(&*self.our_key);
290 witnesses
291 .into_iter()
292 .map(|node| DistributionPlanEntry {
293 node,
294 mode: DistributionPlanMode::Clear,
295 })
296 .collect::<Vec<_>>()
297 } else {
298 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
299 error!(
300 msg_type = "Update",
301 subject_id = %subject_id,
302 governance_id = %governance_id,
303 error = %e,
304 "Failed to get governance"
305 );
306 e
307 })?;
308
309 let Some(event_request) = ledger.get_event_request() else {
310 return Err(ActorError::Functional {
311 description: "Missing event request in tracker ledger".to_owned(),
312 });
313 };
314
315 Self::build_tracker_manual_plan(
316 &gov,
317 schema_id.clone(),
318 ave_common::Namespace::from(data.get_namespace()),
319 &event_request,
320 &ledger.ledger_seal_signature.signer,
321 )?
322 .into_iter()
323 .filter(|entry| entry.node != *self.our_key)
324 .collect::<Vec<_>>()
325 };
326
327 if recipients.is_empty() {
328 warn!(
329 msg_type = "Update",
330 subject_id = %subject_id,
331 "No witnesses available for manual distribution"
332 );
333 return Err(ActorError::Functional {
334 description: "No witnesses available to manually send the last ledger event".to_string()
335 });
336 }
337
338 let witnesses_count = recipients.len();
339
340 let Some(network) = ctx
341 .system()
342 .get_helper::<Arc<NetworkSender>>("network")
343 .await
344 else {
345 error!(
346 msg_type = "Update",
347 subject_id = %subject_id,
348 "Network helper not found"
349 );
350 return Err(ActorError::Helper {
351 name: "network".to_owned(),
352 reason: "Not found".to_owned(),
353 });
354 };
355
356 let distribution = Distribution::new(
357 network,
358 DistributionType::Manual,
359 DigestIdentifier::default(),
360 );
361
362 let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
363 warn!(
364 msg_type = "Update",
365 subject_id = %subject_id,
366 error = %e,
367 "Manual distribution already in progress"
368 );
369 ActorError::Functional {
370 description: "Manual distribution already in progress for this subject".to_owned()
371 }
372 })?;
373
374 if let Err(e) = distribution_actor
375 .tell(DistributionMessage::Create {
376 distribution_plan: recipients,
377 ledger: Box::new(ledger),
378 })
379 .await
380 {
381 error!(
382 msg_type = "Update",
383 subject_id = %subject_id,
384 witnesses_count = witnesses_count,
385 error = %e,
386 "Failed to start manual distribution"
387 );
388 return Err(ActorError::Functional {
389 description: format!(
390 "Failed to start manual distribution: {}",
391 e
392 ),
393 });
394 };
395
396 debug!(
397 msg_type = "Update",
398 subject_id = %subject_id,
399 witnesses_count = witnesses_count,
400 is_gov = is_gov,
401 "Manual distribution started successfully"
402 );
403
404 Ok(())
405 }
406 }
407 }
408
409 async fn on_child_fault(
410 &mut self,
411 error: ActorError,
412 ctx: &mut ActorContext<Self>,
413 ) -> ChildAction {
414 error!(
415 error = %error,
416 "Child actor fault in manual distribution"
417 );
418 emit_fail(ctx, error).await;
419 ChildAction::Stop
420 }
421}