Skip to main content

ave_core/manual_distribution/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    Namespace,
10    identity::{DigestIdentifier, PublicKey},
11};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span, warn};
14
15use crate::{
16    distribution::{Distribution, DistributionMessage, DistributionType},
17    governance::model::WitnessesData,
18    helpers::network::service::NetworkSender,
19    model::common::{
20        emit_fail,
21        node::i_can_send_last_ledger,
22        subject::{get_gov, get_last_ledger_event},
23    },
24};
25
26pub struct ManualDistribution {
27    our_key: Arc<PublicKey>,
28}
29
30impl ManualDistribution {
31    pub const fn new(our_key: Arc<PublicKey>) -> Self {
32        Self { our_key }
33    }
34}
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub enum ManualDistributionMessage {
38    Update(DigestIdentifier),
39}
40
41impl Message for ManualDistributionMessage {}
42
43impl NotPersistentActor for ManualDistribution {}
44
45#[async_trait]
46impl Actor for ManualDistribution {
47    type Message = ManualDistributionMessage;
48    type Event = ();
49    type Response = ();
50
51    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
52        parent_span.map_or_else(
53            || info_span!("ManualDistribution"),
54            |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
55        )
56    }
57}
58
59#[async_trait]
60impl Handler<Self> for ManualDistribution {
61    async fn handle_message(
62        &mut self,
63        _sender: ActorPath,
64        msg: ManualDistributionMessage,
65        ctx: &mut ave_actors::ActorContext<Self>,
66    ) -> Result<(), ActorError> {
67        match msg {
68            ManualDistributionMessage::Update(subject_id) => {
69                let data = i_can_send_last_ledger(ctx, &subject_id)
70                    .await
71                    .map_err(|e| {
72                        error!(
73                            msg_type = "Update",
74                            subject_id = %subject_id,
75                            error = %e,
76                            "Failed to check if we can send last ledger"
77                        );
78                        e
79                    })?;
80
81                let Some(data) = data else {
82                    warn!(
83                        msg_type = "Update",
84                        subject_id = %subject_id,
85                        "Not the owner of the subject nor rejected transfer"
86                    );
87                    return Err(ActorError::Functional {
88                        description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
89                    });
90                };
91
92                let ledger = get_last_ledger_event(ctx, &subject_id)
93                    .await
94                    .map_err(|e| {
95                        error!(
96                            msg_type = "Update",
97                            subject_id = %subject_id,
98                            error = %e,
99                            "Failed to get last ledger event"
100                        );
101                        e
102                    })?;
103
104                let Some(ledger) = ledger else {
105                    error!(
106                        msg_type = "Update",
107                        subject_id = %subject_id,
108                        "No ledger event found for subject"
109                    );
110                    return Err(ActorError::Functional {
111                        description: "Cannot obtain last ledger event"
112                            .to_string(),
113                    });
114                };
115
116                let governance_id =
117                    data.get_governance_id().as_ref().map_or_else(
118                        || subject_id.clone(),
119                        |governance_id| governance_id.clone(),
120                    );
121
122                let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
123                    error!(
124                        msg_type = "Update",
125                        subject_id = %subject_id,
126                        governance_id = %governance_id,
127                        error = %e,
128                        "Failed to get governance"
129                    );
130                    e
131                })?;
132
133                let schema_id = data.get_schema_id();
134
135                let is_gov = schema_id.is_gov();
136                let witnesses_data = if is_gov {
137                    WitnessesData::Gov
138                } else {
139                    WitnessesData::Schema {
140                        creator: (*self.our_key).clone(),
141                        schema_id: schema_id.clone(),
142                        namespace: Namespace::from(data.get_namespace()),
143                    }
144                };
145
146                let mut witnesses =
147                    gov.get_witnesses(witnesses_data).map_err(|e| {
148                        error!(
149                            msg_type = "Update",
150                            subject_id = %subject_id,
151                            is_gov = is_gov,
152                            error = %e,
153                            "Failed to get witnesses from governance"
154                        );
155                        ActorError::Functional {
156                            description: e.to_string(),
157                        }
158                    })?;
159
160                witnesses.remove(&*self.our_key);
161                if witnesses.is_empty() {
162                    warn!(
163                        msg_type = "Update",
164                        subject_id = %subject_id,
165                        "No witnesses available for manual distribution"
166                    );
167                    return Err(ActorError::Functional {
168                        description: "No witnesses available to manually send the last ledger event".to_string()
169                    });
170                }
171
172                let witnesses_count = witnesses.len();
173
174                let Some(network) = ctx
175                    .system()
176                    .get_helper::<Arc<NetworkSender>>("network")
177                    .await
178                else {
179                    error!(
180                        msg_type = "Update",
181                        subject_id = %subject_id,
182                        "Network helper not found"
183                    );
184                    return Err(ActorError::Helper {
185                        name: "network".to_owned(),
186                        reason: "Not found".to_owned(),
187                    });
188                };
189
190                let distribution = Distribution::new(
191                    network,
192                    DistributionType::Manual,
193                    DigestIdentifier::default(),
194                );
195
196                let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
197                    warn!(
198                        msg_type = "Update",
199                        subject_id = %subject_id,
200                        error = %e,
201                        "Manual distribution already in progress"
202                    );
203                    ActorError::Functional {
204                        description: "Manual distribution already in progress for this subject".to_owned()
205                    }
206                })?;
207
208                if let Err(e) = distribution_actor
209                    .tell(DistributionMessage::Create {
210                        witnesses: witnesses.clone(),
211                        ledger: Box::new(ledger),
212                    })
213                    .await
214                {
215                    error!(
216                        msg_type = "Update",
217                        subject_id = %subject_id,
218                        witnesses_count = witnesses_count,
219                        error = %e,
220                        "Failed to start manual distribution"
221                    );
222                    return Err(ActorError::Functional {
223                        description: format!(
224                            "Failed to start manual distribution: {}",
225                            e
226                        ),
227                    });
228                };
229
230                debug!(
231                    msg_type = "Update",
232                    subject_id = %subject_id,
233                    witnesses_count = witnesses_count,
234                    is_gov = is_gov,
235                    "Manual distribution started successfully"
236                );
237
238                Ok(())
239            }
240        }
241    }
242
243    async fn on_child_fault(
244        &mut self,
245        error: ActorError,
246        ctx: &mut ActorContext<Self>,
247    ) -> ChildAction {
248        error!(
249            error = %error,
250            "Child actor fault in manual distribution"
251        );
252        emit_fail(ctx, error).await;
253        ChildAction::Stop
254    }
255}