Skip to main content

ave_core/manual_distribution/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    identity::{DigestIdentifier, PublicKey},
10    request::EventRequest,
11    schematype::ReservedWords,
12};
13use serde::{Deserialize, Serialize};
14use tracing::{Span, debug, error, info_span, warn};
15
16use crate::{
17    distribution::{Distribution, DistributionMessage, DistributionType},
18    governance::{
19        data::GovernanceData,
20        model::{RoleTypes, WitnessesData},
21    },
22    helpers::network::service::NetworkSender,
23    model::common::{
24        emit_fail,
25        node::i_can_send_last_ledger,
26        subject::{
27            acquire_subject, get_gov, get_last_ledger_event,
28        },
29    },
30    request::types::{DistributionPlanEntry, DistributionPlanMode},
31};
32
33pub struct ManualDistribution {
34    our_key: Arc<PublicKey>,
35}
36
37impl ManualDistribution {
38    pub const fn new(our_key: Arc<PublicKey>) -> Self {
39        Self { our_key }
40    }
41
42    fn tracker_fact_mode_for_creator(
43        governance_data: &GovernanceData,
44        schema_id: &ave_common::SchemaType,
45        namespace: &ave_common::Namespace,
46        creator: &PublicKey,
47        witness: &PublicKey,
48        viewpoints: &std::collections::BTreeSet<String>,
49    ) -> DistributionPlanMode {
50        let Some(witness_name) = governance_data
51            .members
52            .iter()
53            .find(|(_, key)| *key == witness)
54            .map(|(name, _)| name.clone())
55        else {
56            return DistributionPlanMode::Opaque;
57        };
58
59        let Some(creator_name) = governance_data
60            .members
61            .iter()
62            .find(|(_, key)| *key == creator)
63            .map(|(name, _)| name.clone())
64        else {
65            return DistributionPlanMode::Opaque;
66        };
67
68        let Some(roles_schema) = governance_data.roles_schema.get(schema_id)
69        else {
70            return DistributionPlanMode::Opaque;
71        };
72
73        let Some(role_creator) =
74            roles_schema
75                .creator
76                .get(&ave_common::governance::RoleCreator::create(
77                    &creator_name,
78                    namespace.clone(),
79                ))
80        else {
81            return DistributionPlanMode::Opaque;
82        };
83
84        let is_generic_witness =
85            roles_schema.hash_this_rol(
86                RoleTypes::Witness,
87                namespace.clone(),
88                &witness_name,
89            ) || governance_data.roles_tracker_schemas.hash_this_rol(
90                RoleTypes::Witness,
91                namespace.clone(),
92                &witness_name,
93            );
94
95        let allows_clear =
96            role_creator.witnesses.iter().any(|creator_witness| {
97                let applies = creator_witness.name == witness_name
98                    || (creator_witness.name
99                        == ReservedWords::Witnesses.to_string()
100                        && is_generic_witness);
101
102                if !applies {
103                    return false;
104                }
105
106                creator_witness
107                    .viewpoints
108                    .contains(&ReservedWords::AllViewpoints.to_string())
109                    || viewpoints.is_empty()
110                    || viewpoints.is_subset(&creator_witness.viewpoints)
111            });
112
113        if allows_clear {
114            DistributionPlanMode::Clear
115        } else {
116            DistributionPlanMode::Opaque
117        }
118    }
119
120    fn build_tracker_manual_plan(
121        governance_data: &GovernanceData,
122        schema_id: ave_common::SchemaType,
123        namespace: ave_common::Namespace,
124        event_request: &EventRequest,
125        signer: &PublicKey,
126    ) -> Result<Vec<DistributionPlanEntry>, ActorError> {
127        let witnesses = governance_data
128            .get_witnesses(WitnessesData::Schema {
129                creator: signer.clone(),
130                schema_id: schema_id.clone(),
131                namespace: namespace.clone(),
132            })
133            .map_err(|e| ActorError::Functional {
134                description: e.to_string(),
135            })?;
136
137        Ok(witnesses
138            .into_iter()
139            .map(|node| {
140                let mode = match event_request {
141                    EventRequest::Fact(fact_request) => {
142                        Self::tracker_fact_mode_for_creator(
143                            governance_data,
144                            &schema_id,
145                            &namespace,
146                            signer,
147                            &node,
148                            &fact_request.viewpoints,
149                        )
150                    }
151                    _ => DistributionPlanMode::Clear,
152                };
153
154                DistributionPlanEntry { node, mode }
155            })
156            .collect())
157    }
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize)]
161pub enum ManualDistributionMessage {
162    Update(DigestIdentifier),
163}
164
165impl Message for ManualDistributionMessage {}
166
167impl NotPersistentActor for ManualDistribution {}
168
169#[async_trait]
170impl Actor for ManualDistribution {
171    type Message = ManualDistributionMessage;
172    type Event = ();
173    type Response = ();
174
175    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
176        parent_span.map_or_else(
177            || info_span!("ManualDistribution"),
178            |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
179        )
180    }
181}
182
183#[async_trait]
184impl Handler<Self> for ManualDistribution {
185    async fn handle_message(
186        &mut self,
187        _sender: ActorPath,
188        msg: ManualDistributionMessage,
189        ctx: &mut ave_actors::ActorContext<Self>,
190    ) -> Result<(), ActorError> {
191        match msg {
192            ManualDistributionMessage::Update(subject_id) => {
193                let data = i_can_send_last_ledger(ctx, &subject_id)
194                    .await
195                    .map_err(|e| {
196                        error!(
197                            msg_type = "Update",
198                            subject_id = %subject_id,
199                            error = %e,
200                            "Failed to check if we can send last ledger"
201                        );
202                        e
203                    })?;
204
205                let Some(data) = data else {
206                    warn!(
207                        msg_type = "Update",
208                        subject_id = %subject_id,
209                        "Not the owner of the subject nor rejected transfer"
210                    );
211                    return Err(ActorError::Functional {
212                        description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
213                    });
214                };
215
216                let is_gov = data.get_schema_id().is_gov();
217
218                let ledger = if is_gov {
219                    get_last_ledger_event(ctx, &subject_id).await
220                } else {
221                    let lease = acquire_subject(
222                        ctx,
223                        &subject_id,
224                        format!("manual_distribution:{}", subject_id),
225                        None,
226                        true,
227                    )
228                    .await?;
229
230                    let ledger = get_last_ledger_event(ctx, &subject_id).await;
231                    lease.finish(ctx).await?;
232                    ledger
233                };
234
235                let ledger = ledger.map_err(|e| {
236                    error!(
237                        msg_type = "Update",
238                        subject_id = %subject_id,
239                        error = %e,
240                        "Failed to get last ledger event"
241                    );
242                    e
243                })?;
244
245                let Some(ledger) = ledger else {
246                    error!(
247                        msg_type = "Update",
248                        subject_id = %subject_id,
249                        "No ledger event found for subject"
250                    );
251                    return Err(ActorError::Functional {
252                        description: "Cannot obtain last ledger event"
253                            .to_string(),
254                    });
255                };
256
257                let governance_id =
258                    data.get_governance_id().as_ref().map_or_else(
259                        || subject_id.clone(),
260                        |governance_id| governance_id.clone(),
261                    );
262
263                let schema_id = data.get_schema_id();
264                let recipients = if is_gov {
265                    let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
266                        error!(
267                            msg_type = "Update",
268                            subject_id = %subject_id,
269                            governance_id = %governance_id,
270                            error = %e,
271                            "Failed to get governance"
272                        );
273                        e
274                    })?;
275
276                    let mut witnesses =
277                        gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
278                            error!(
279                                msg_type = "Update",
280                                subject_id = %subject_id,
281                                is_gov = is_gov,
282                                error = %e,
283                                "Failed to get witnesses from governance"
284                            );
285                            ActorError::Functional {
286                                description: e.to_string(),
287                            }
288                        })?;
289                    witnesses.remove(&*self.our_key);
290                    witnesses
291                        .into_iter()
292                        .map(|node| DistributionPlanEntry {
293                            node,
294                            mode: DistributionPlanMode::Clear,
295                        })
296                        .collect::<Vec<_>>()
297                } else {
298                    let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
299                        error!(
300                            msg_type = "Update",
301                            subject_id = %subject_id,
302                            governance_id = %governance_id,
303                            error = %e,
304                            "Failed to get governance"
305                        );
306                        e
307                    })?;
308
309                    let Some(event_request) = ledger.get_event_request() else {
310                        return Err(ActorError::Functional {
311                            description: "Missing event request in tracker ledger".to_owned(),
312                        });
313                    };
314
315                    Self::build_tracker_manual_plan(
316                        &gov,
317                        schema_id.clone(),
318                        ave_common::Namespace::from(data.get_namespace()),
319                        &event_request,
320                        &ledger.ledger_seal_signature.signer,
321                    )?
322                    .into_iter()
323                    .filter(|entry| entry.node != *self.our_key)
324                    .collect::<Vec<_>>()
325                };
326
327                if recipients.is_empty() {
328                    warn!(
329                        msg_type = "Update",
330                        subject_id = %subject_id,
331                        "No witnesses available for manual distribution"
332                    );
333                    return Err(ActorError::Functional {
334                        description: "No witnesses available to manually send the last ledger event".to_string()
335                    });
336                }
337
338                let witnesses_count = recipients.len();
339
340                let Some(network) = ctx
341                    .system()
342                    .get_helper::<Arc<NetworkSender>>("network")
343                    .await
344                else {
345                    error!(
346                        msg_type = "Update",
347                        subject_id = %subject_id,
348                        "Network helper not found"
349                    );
350                    return Err(ActorError::Helper {
351                        name: "network".to_owned(),
352                        reason: "Not found".to_owned(),
353                    });
354                };
355
356                let distribution = Distribution::new(
357                    network,
358                    DistributionType::Manual,
359                    DigestIdentifier::default(),
360                );
361
362                let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
363                    warn!(
364                        msg_type = "Update",
365                        subject_id = %subject_id,
366                        error = %e,
367                        "Manual distribution already in progress"
368                    );
369                    ActorError::Functional {
370                        description: "Manual distribution already in progress for this subject".to_owned()
371                    }
372                })?;
373
374                if let Err(e) = distribution_actor
375                    .tell(DistributionMessage::Create {
376                        distribution_plan: recipients,
377                        ledger: Box::new(ledger),
378                    })
379                    .await
380                {
381                    error!(
382                        msg_type = "Update",
383                        subject_id = %subject_id,
384                        witnesses_count = witnesses_count,
385                        error = %e,
386                        "Failed to start manual distribution"
387                    );
388                    return Err(ActorError::Functional {
389                        description: format!(
390                            "Failed to start manual distribution: {}",
391                            e
392                        ),
393                    });
394                };
395
396                debug!(
397                    msg_type = "Update",
398                    subject_id = %subject_id,
399                    witnesses_count = witnesses_count,
400                    is_gov = is_gov,
401                    "Manual distribution started successfully"
402                );
403
404                Ok(())
405            }
406        }
407    }
408
409    async fn on_child_fault(
410        &mut self,
411        error: ActorError,
412        ctx: &mut ActorContext<Self>,
413    ) -> ChildAction {
414        error!(
415            error = %error,
416            "Child actor fault in manual distribution"
417        );
418        emit_fail(ctx, error).await;
419        ChildAction::Stop
420    }
421}