Skip to main content

ave_core/manual_distribution/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    identity::{DigestIdentifier, PublicKey},
10    request::EventRequest,
11    schematype::ReservedWords,
12};
13use serde::{Deserialize, Serialize};
14use tracing::{Span, debug, error, info_span, warn};
15
16use crate::{
17    distribution::{Distribution, DistributionMessage, DistributionType},
18    governance::{
19        data::GovernanceData,
20        model::{RoleTypes, WitnessesData},
21    },
22    helpers::network::service::NetworkSender,
23    model::common::{
24        emit_fail,
25        node::i_can_send_last_ledger,
26        subject::{acquire_subject, get_gov, get_last_ledger_event},
27    },
28    request::types::{DistributionPlanEntry, DistributionPlanMode},
29};
30
31pub struct ManualDistribution {
32    our_key: Arc<PublicKey>,
33}
34
35impl ManualDistribution {
36    pub const fn new(our_key: Arc<PublicKey>) -> Self {
37        Self { our_key }
38    }
39
40    fn tracker_fact_mode_for_creator(
41        governance_data: &GovernanceData,
42        schema_id: &ave_common::SchemaType,
43        namespace: &ave_common::Namespace,
44        creator: &PublicKey,
45        witness: &PublicKey,
46        viewpoints: &std::collections::BTreeSet<String>,
47    ) -> DistributionPlanMode {
48        let Some(witness_name) = governance_data
49            .members
50            .iter()
51            .find(|(_, key)| *key == witness)
52            .map(|(name, _)| name.clone())
53        else {
54            return DistributionPlanMode::Opaque;
55        };
56
57        let Some(creator_name) = governance_data
58            .members
59            .iter()
60            .find(|(_, key)| *key == creator)
61            .map(|(name, _)| name.clone())
62        else {
63            return DistributionPlanMode::Opaque;
64        };
65
66        let Some(roles_schema) = governance_data.roles_schema.get(schema_id)
67        else {
68            return DistributionPlanMode::Opaque;
69        };
70
71        let Some(role_creator) = roles_schema.creator.get(
72            &ave_common::governance::RoleCreator::create(
73                &creator_name,
74                namespace.clone(),
75            ),
76        ) else {
77            return DistributionPlanMode::Opaque;
78        };
79
80        let is_generic_witness =
81            roles_schema.hash_this_rol(
82                RoleTypes::Witness,
83                namespace.clone(),
84                &witness_name,
85            ) || governance_data.roles_tracker_schemas.hash_this_rol(
86                RoleTypes::Witness,
87                namespace.clone(),
88                &witness_name,
89            );
90
91        let allows_clear =
92            role_creator.witnesses.iter().any(|creator_witness| {
93                let applies = creator_witness.name == witness_name
94                    || (creator_witness.name
95                        == ReservedWords::Witnesses.to_string()
96                        && is_generic_witness);
97
98                if !applies {
99                    return false;
100                }
101
102                creator_witness
103                    .viewpoints
104                    .contains(&ReservedWords::AllViewpoints.to_string())
105                    || viewpoints.is_empty()
106                    || viewpoints.is_subset(&creator_witness.viewpoints)
107            });
108
109        if allows_clear {
110            DistributionPlanMode::Clear
111        } else {
112            DistributionPlanMode::Opaque
113        }
114    }
115
116    fn build_tracker_manual_plan(
117        governance_data: &GovernanceData,
118        schema_id: ave_common::SchemaType,
119        namespace: ave_common::Namespace,
120        event_request: &EventRequest,
121        signer: &PublicKey,
122    ) -> Result<Vec<DistributionPlanEntry>, ActorError> {
123        let witnesses = governance_data
124            .get_witnesses(WitnessesData::Schema {
125                creator: signer.clone(),
126                schema_id: schema_id.clone(),
127                namespace: namespace.clone(),
128            })
129            .map_err(|e| ActorError::Functional {
130                description: e.to_string(),
131            })?;
132
133        Ok(witnesses
134            .into_iter()
135            .map(|node| {
136                let mode = match event_request {
137                    EventRequest::Fact(fact_request) => {
138                        Self::tracker_fact_mode_for_creator(
139                            governance_data,
140                            &schema_id,
141                            &namespace,
142                            signer,
143                            &node,
144                            &fact_request.viewpoints,
145                        )
146                    }
147                    _ => DistributionPlanMode::Clear,
148                };
149
150                DistributionPlanEntry { node, mode }
151            })
152            .collect())
153    }
154}
155
156#[derive(Clone, Debug, Serialize, Deserialize)]
157pub enum ManualDistributionMessage {
158    Update(DigestIdentifier),
159}
160
161impl Message for ManualDistributionMessage {}
162
163impl NotPersistentActor for ManualDistribution {}
164
165#[async_trait]
166impl Actor for ManualDistribution {
167    type Message = ManualDistributionMessage;
168    type Event = ();
169    type Response = ();
170
171    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
172        parent_span.map_or_else(
173            || info_span!("ManualDistribution"),
174            |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
175        )
176    }
177}
178
179#[async_trait]
180impl Handler<Self> for ManualDistribution {
181    async fn handle_message(
182        &mut self,
183        _sender: ActorPath,
184        msg: ManualDistributionMessage,
185        ctx: &mut ave_actors::ActorContext<Self>,
186    ) -> Result<(), ActorError> {
187        match msg {
188            ManualDistributionMessage::Update(subject_id) => {
189                let data = i_can_send_last_ledger(ctx, &subject_id)
190                    .await
191                    .map_err(|e| {
192                        error!(
193                            msg_type = "Update",
194                            subject_id = %subject_id,
195                            error = %e,
196                            "Failed to check if we can send last ledger"
197                        );
198                        e
199                    })?;
200
201                let Some(data) = data else {
202                    warn!(
203                        msg_type = "Update",
204                        subject_id = %subject_id,
205                        "Not the owner of the subject nor rejected transfer"
206                    );
207                    return Err(ActorError::Functional {
208                        description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
209                    });
210                };
211
212                let is_gov = data.get_schema_id().is_gov();
213
214                let ledger = if is_gov {
215                    get_last_ledger_event(ctx, &subject_id).await
216                } else {
217                    let lease = acquire_subject(
218                        ctx,
219                        &subject_id,
220                        format!("manual_distribution:{}", subject_id),
221                        None,
222                        true,
223                    )
224                    .await?;
225
226                    let ledger = get_last_ledger_event(ctx, &subject_id).await;
227                    lease.finish(ctx).await?;
228                    ledger
229                };
230
231                let ledger = ledger.map_err(|e| {
232                    error!(
233                        msg_type = "Update",
234                        subject_id = %subject_id,
235                        error = %e,
236                        "Failed to get last ledger event"
237                    );
238                    e
239                })?;
240
241                let Some(ledger) = ledger else {
242                    error!(
243                        msg_type = "Update",
244                        subject_id = %subject_id,
245                        "No ledger event found for subject"
246                    );
247                    return Err(ActorError::Functional {
248                        description: "Cannot obtain last ledger event"
249                            .to_string(),
250                    });
251                };
252
253                let governance_id =
254                    data.get_governance_id().as_ref().map_or_else(
255                        || subject_id.clone(),
256                        |governance_id| governance_id.clone(),
257                    );
258
259                let schema_id = data.get_schema_id();
260                let recipients = if is_gov {
261                    let gov =
262                        get_gov(ctx, &governance_id).await.map_err(|e| {
263                            error!(
264                                msg_type = "Update",
265                                subject_id = %subject_id,
266                                governance_id = %governance_id,
267                                error = %e,
268                                "Failed to get governance"
269                            );
270                            e
271                        })?;
272
273                    let mut witnesses =
274                        gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
275                            error!(
276                                msg_type = "Update",
277                                subject_id = %subject_id,
278                                is_gov = is_gov,
279                                error = %e,
280                                "Failed to get witnesses from governance"
281                            );
282                            ActorError::Functional {
283                                description: e.to_string(),
284                            }
285                        })?;
286                    witnesses.remove(&*self.our_key);
287                    witnesses
288                        .into_iter()
289                        .map(|node| DistributionPlanEntry {
290                            node,
291                            mode: DistributionPlanMode::Clear,
292                        })
293                        .collect::<Vec<_>>()
294                } else {
295                    let gov =
296                        get_gov(ctx, &governance_id).await.map_err(|e| {
297                            error!(
298                                msg_type = "Update",
299                                subject_id = %subject_id,
300                                governance_id = %governance_id,
301                                error = %e,
302                                "Failed to get governance"
303                            );
304                            e
305                        })?;
306
307                    let Some(event_request) = ledger.get_event_request() else {
308                        return Err(ActorError::Functional {
309                            description:
310                                "Missing event request in tracker ledger"
311                                    .to_owned(),
312                        });
313                    };
314
315                    Self::build_tracker_manual_plan(
316                        &gov,
317                        schema_id.clone(),
318                        ave_common::Namespace::from(data.get_namespace()),
319                        &event_request,
320                        &ledger.ledger_seal_signature.signer,
321                    )?
322                    .into_iter()
323                    .filter(|entry| entry.node != *self.our_key)
324                    .collect::<Vec<_>>()
325                };
326
327                if recipients.is_empty() {
328                    warn!(
329                        msg_type = "Update",
330                        subject_id = %subject_id,
331                        "No witnesses available for manual distribution"
332                    );
333                    return Err(ActorError::Functional {
334                        description: "No witnesses available to manually send the last ledger event".to_string()
335                    });
336                }
337
338                let witnesses_count = recipients.len();
339
340                let Some(network) = ctx
341                    .system()
342                    .get_helper::<Arc<NetworkSender>>("network")
343                    .await
344                else {
345                    error!(
346                        msg_type = "Update",
347                        subject_id = %subject_id,
348                        "Network helper not found"
349                    );
350                    return Err(ActorError::Helper {
351                        name: "network".to_owned(),
352                        reason: "Not found".to_owned(),
353                    });
354                };
355
356                let distribution = Distribution::new(
357                    network,
358                    DistributionType::Manual,
359                    DigestIdentifier::default(),
360                );
361
362                let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
363                    warn!(
364                        msg_type = "Update",
365                        subject_id = %subject_id,
366                        error = %e,
367                        "Manual distribution already in progress"
368                    );
369                    ActorError::Functional {
370                        description: "Manual distribution already in progress for this subject".to_owned()
371                    }
372                })?;
373
374                if let Err(e) = distribution_actor
375                    .tell(DistributionMessage::Create {
376                        distribution_plan: recipients,
377                        ledger: Box::new(ledger),
378                    })
379                    .await
380                {
381                    error!(
382                        msg_type = "Update",
383                        subject_id = %subject_id,
384                        witnesses_count = witnesses_count,
385                        error = %e,
386                        "Failed to start manual distribution"
387                    );
388                    return Err(ActorError::Functional {
389                        description: format!(
390                            "Failed to start manual distribution: {}",
391                            e
392                        ),
393                    });
394                };
395
396                debug!(
397                    msg_type = "Update",
398                    subject_id = %subject_id,
399                    witnesses_count = witnesses_count,
400                    is_gov = is_gov,
401                    "Manual distribution started successfully"
402                );
403
404                Ok(())
405            }
406        }
407    }
408
409    async fn on_child_fault(
410        &mut self,
411        error: ActorError,
412        ctx: &mut ActorContext<Self>,
413    ) -> ChildAction {
414        error!(
415            error = %error,
416            "Child actor fault in manual distribution"
417        );
418        emit_fail(ctx, error).await;
419        ChildAction::Stop
420    }
421}