Skip to main content

ave_core/manual_distribution/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    Namespace,
10    identity::{DigestIdentifier, PublicKey},
11};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span, warn};
14
15use crate::{
16    distribution::{Distribution, DistributionMessage, DistributionType},
17    governance::model::WitnessesData,
18    helpers::network::service::NetworkSender,
19    model::common::{
20        emit_fail,
21        node::i_can_send_last_ledger,
22        subject::{acquire_subject, get_gov, get_last_ledger_event},
23    },
24};
25
26pub struct ManualDistribution {
27    our_key: Arc<PublicKey>,
28}
29
30impl ManualDistribution {
31    pub const fn new(our_key: Arc<PublicKey>) -> Self {
32        Self { our_key }
33    }
34}
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub enum ManualDistributionMessage {
38    Update(DigestIdentifier),
39}
40
41impl Message for ManualDistributionMessage {}
42
43impl NotPersistentActor for ManualDistribution {}
44
45#[async_trait]
46impl Actor for ManualDistribution {
47    type Message = ManualDistributionMessage;
48    type Event = ();
49    type Response = ();
50
51    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
52        parent_span.map_or_else(
53            || info_span!("ManualDistribution"),
54            |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
55        )
56    }
57}
58
59#[async_trait]
60impl Handler<Self> for ManualDistribution {
61    async fn handle_message(
62        &mut self,
63        _sender: ActorPath,
64        msg: ManualDistributionMessage,
65        ctx: &mut ave_actors::ActorContext<Self>,
66    ) -> Result<(), ActorError> {
67        match msg {
68            ManualDistributionMessage::Update(subject_id) => {
69                let data = i_can_send_last_ledger(ctx, &subject_id)
70                    .await
71                    .map_err(|e| {
72                        error!(
73                            msg_type = "Update",
74                            subject_id = %subject_id,
75                            error = %e,
76                            "Failed to check if we can send last ledger"
77                        );
78                        e
79                    })?;
80
81                let Some(data) = data else {
82                    warn!(
83                        msg_type = "Update",
84                        subject_id = %subject_id,
85                        "Not the owner of the subject nor rejected transfer"
86                    );
87                    return Err(ActorError::Functional {
88                        description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
89                    });
90                };
91
92                let is_tracker = data.get_governance_id().is_some();
93                let ledger = if is_tracker {
94                    let lease = acquire_subject(
95                        ctx,
96                        &subject_id,
97                        format!("manual_distribution:{}", subject_id),
98                        None,
99                        true,
100                    )
101                    .await?;
102                    let ledger = get_last_ledger_event(ctx, &subject_id).await;
103                    lease.finish(ctx).await?;
104                    ledger
105                } else {
106                    get_last_ledger_event(ctx, &subject_id).await
107                }
108                .map_err(|e| {
109                    error!(
110                        msg_type = "Update",
111                        subject_id = %subject_id,
112                        error = %e,
113                        "Failed to get last ledger event"
114                    );
115                    e
116                })?;
117
118                let Some(ledger) = ledger else {
119                    error!(
120                        msg_type = "Update",
121                        subject_id = %subject_id,
122                        "No ledger event found for subject"
123                    );
124                    return Err(ActorError::Functional {
125                        description: "Cannot obtain last ledger event"
126                            .to_string(),
127                    });
128                };
129
130                let governance_id =
131                    data.get_governance_id().as_ref().map_or_else(
132                        || subject_id.clone(),
133                        |governance_id| governance_id.clone(),
134                    );
135
136                let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
137                    error!(
138                        msg_type = "Update",
139                        subject_id = %subject_id,
140                        governance_id = %governance_id,
141                        error = %e,
142                        "Failed to get governance"
143                    );
144                    e
145                })?;
146
147                let schema_id = data.get_schema_id();
148
149                let is_gov = schema_id.is_gov();
150                let witnesses_data = if is_gov {
151                    WitnessesData::Gov
152                } else {
153                    WitnessesData::Schema {
154                        creator: (*self.our_key).clone(),
155                        schema_id: schema_id.clone(),
156                        namespace: Namespace::from(data.get_namespace()),
157                    }
158                };
159
160                let mut witnesses =
161                    gov.get_witnesses(witnesses_data).map_err(|e| {
162                        error!(
163                            msg_type = "Update",
164                            subject_id = %subject_id,
165                            is_gov = is_gov,
166                            error = %e,
167                            "Failed to get witnesses from governance"
168                        );
169                        ActorError::Functional {
170                            description: e.to_string(),
171                        }
172                    })?;
173
174                witnesses.remove(&*self.our_key);
175                if witnesses.is_empty() {
176                    warn!(
177                        msg_type = "Update",
178                        subject_id = %subject_id,
179                        "No witnesses available for manual distribution"
180                    );
181                    return Err(ActorError::Functional {
182                        description: "No witnesses available to manually send the last ledger event".to_string()
183                    });
184                }
185
186                let witnesses_count = witnesses.len();
187
188                let Some(network) = ctx
189                    .system()
190                    .get_helper::<Arc<NetworkSender>>("network")
191                    .await
192                else {
193                    error!(
194                        msg_type = "Update",
195                        subject_id = %subject_id,
196                        "Network helper not found"
197                    );
198                    return Err(ActorError::Helper {
199                        name: "network".to_owned(),
200                        reason: "Not found".to_owned(),
201                    });
202                };
203
204                let distribution = Distribution::new(
205                    network,
206                    DistributionType::Manual,
207                    DigestIdentifier::default(),
208                );
209
210                let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
211                    warn!(
212                        msg_type = "Update",
213                        subject_id = %subject_id,
214                        error = %e,
215                        "Manual distribution already in progress"
216                    );
217                    ActorError::Functional {
218                        description: "Manual distribution already in progress for this subject".to_owned()
219                    }
220                })?;
221
222                if let Err(e) = distribution_actor
223                    .tell(DistributionMessage::Create {
224                        witnesses: witnesses.clone(),
225                        ledger: Box::new(ledger),
226                    })
227                    .await
228                {
229                    error!(
230                        msg_type = "Update",
231                        subject_id = %subject_id,
232                        witnesses_count = witnesses_count,
233                        error = %e,
234                        "Failed to start manual distribution"
235                    );
236                    return Err(ActorError::Functional {
237                        description: format!(
238                            "Failed to start manual distribution: {}",
239                            e
240                        ),
241                    });
242                };
243
244                debug!(
245                    msg_type = "Update",
246                    subject_id = %subject_id,
247                    witnesses_count = witnesses_count,
248                    is_gov = is_gov,
249                    "Manual distribution started successfully"
250                );
251
252                Ok(())
253            }
254        }
255    }
256
257    async fn on_child_fault(
258        &mut self,
259        error: ActorError,
260        ctx: &mut ActorContext<Self>,
261    ) -> ChildAction {
262        error!(
263            error = %error,
264            "Child actor fault in manual distribution"
265        );
266        emit_fail(ctx, error).await;
267        ChildAction::Stop
268    }
269}