ave_core/manual_distribution/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 Namespace,
10 identity::{DigestIdentifier, PublicKey},
11};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span, warn};
14
15use crate::{
16 distribution::{Distribution, DistributionMessage, DistributionType},
17 governance::model::WitnessesData,
18 helpers::network::service::NetworkSender,
19 model::common::{
20 emit_fail,
21 node::i_can_send_last_ledger,
22 subject::{get_gov, get_last_ledger_event},
23 },
24};
25
26pub struct ManualDistribution {
27 our_key: Arc<PublicKey>,
28}
29
30impl ManualDistribution {
31 pub const fn new(our_key: Arc<PublicKey>) -> Self {
32 Self { our_key }
33 }
34}
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub enum ManualDistributionMessage {
38 Update(DigestIdentifier),
39}
40
41impl Message for ManualDistributionMessage {}
42
43impl NotPersistentActor for ManualDistribution {}
44
45#[async_trait]
46impl Actor for ManualDistribution {
47 type Message = ManualDistributionMessage;
48 type Event = ();
49 type Response = ();
50
51 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
52 parent_span.map_or_else(
53 || info_span!("ManualDistribution"),
54 |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
55 )
56 }
57}
58
59#[async_trait]
60impl Handler<Self> for ManualDistribution {
61 async fn handle_message(
62 &mut self,
63 _sender: ActorPath,
64 msg: ManualDistributionMessage,
65 ctx: &mut ave_actors::ActorContext<Self>,
66 ) -> Result<(), ActorError> {
67 match msg {
68 ManualDistributionMessage::Update(subject_id) => {
69 let data = i_can_send_last_ledger(ctx, &subject_id)
70 .await
71 .map_err(|e| {
72 error!(
73 msg_type = "Update",
74 subject_id = %subject_id,
75 error = %e,
76 "Failed to check if we can send last ledger"
77 );
78 e
79 })?;
80
81 let Some(data) = data else {
82 warn!(
83 msg_type = "Update",
84 subject_id = %subject_id,
85 "Not the owner of the subject nor rejected transfer"
86 );
87 return Err(ActorError::Functional {
88 description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
89 });
90 };
91
92 let ledger = get_last_ledger_event(ctx, &subject_id)
93 .await
94 .map_err(|e| {
95 error!(
96 msg_type = "Update",
97 subject_id = %subject_id,
98 error = %e,
99 "Failed to get last ledger event"
100 );
101 e
102 })?;
103
104 let Some(ledger) = ledger else {
105 error!(
106 msg_type = "Update",
107 subject_id = %subject_id,
108 "No ledger event found for subject"
109 );
110 return Err(ActorError::Functional {
111 description: "Cannot obtain last ledger event"
112 .to_string(),
113 });
114 };
115
116 let governance_id =
117 data.get_governance_id().as_ref().map_or_else(
118 || subject_id.clone(),
119 |governance_id| governance_id.clone(),
120 );
121
122 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
123 error!(
124 msg_type = "Update",
125 subject_id = %subject_id,
126 governance_id = %governance_id,
127 error = %e,
128 "Failed to get governance"
129 );
130 e
131 })?;
132
133 let schema_id = data.get_schema_id();
134
135 let is_gov = schema_id.is_gov();
136 let witnesses_data = if is_gov {
137 WitnessesData::Gov
138 } else {
139 WitnessesData::Schema {
140 creator: (*self.our_key).clone(),
141 schema_id: schema_id.clone(),
142 namespace: Namespace::from(data.get_namespace()),
143 }
144 };
145
146 let mut witnesses =
147 gov.get_witnesses(witnesses_data).map_err(|e| {
148 error!(
149 msg_type = "Update",
150 subject_id = %subject_id,
151 is_gov = is_gov,
152 error = %e,
153 "Failed to get witnesses from governance"
154 );
155 ActorError::Functional {
156 description: e.to_string(),
157 }
158 })?;
159
160 witnesses.remove(&*self.our_key);
161 if witnesses.is_empty() {
162 warn!(
163 msg_type = "Update",
164 subject_id = %subject_id,
165 "No witnesses available for manual distribution"
166 );
167 return Err(ActorError::Functional {
168 description: "No witnesses available to manually send the last ledger event".to_string()
169 });
170 }
171
172 let witnesses_count = witnesses.len();
173
174 let Some(network) = ctx
175 .system()
176 .get_helper::<Arc<NetworkSender>>("network")
177 .await
178 else {
179 error!(
180 msg_type = "Update",
181 subject_id = %subject_id,
182 "Network helper not found"
183 );
184 return Err(ActorError::Helper {
185 name: "network".to_owned(),
186 reason: "Not found".to_owned(),
187 });
188 };
189
190 let distribution = Distribution::new(
191 network,
192 DistributionType::Manual,
193 DigestIdentifier::default(),
194 );
195
196 let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
197 warn!(
198 msg_type = "Update",
199 subject_id = %subject_id,
200 error = %e,
201 "Manual distribution already in progress"
202 );
203 ActorError::Functional {
204 description: "Manual distribution already in progress for this subject".to_owned()
205 }
206 })?;
207
208 if let Err(e) = distribution_actor
209 .tell(DistributionMessage::Create {
210 witnesses: witnesses.clone(),
211 ledger: Box::new(ledger),
212 })
213 .await
214 {
215 error!(
216 msg_type = "Update",
217 subject_id = %subject_id,
218 witnesses_count = witnesses_count,
219 error = %e,
220 "Failed to start manual distribution"
221 );
222 return Err(ActorError::Functional {
223 description: format!(
224 "Failed to start manual distribution: {}",
225 e
226 ),
227 });
228 };
229
230 debug!(
231 msg_type = "Update",
232 subject_id = %subject_id,
233 witnesses_count = witnesses_count,
234 is_gov = is_gov,
235 "Manual distribution started successfully"
236 );
237
238 Ok(())
239 }
240 }
241 }
242
243 async fn on_child_fault(
244 &mut self,
245 error: ActorError,
246 ctx: &mut ActorContext<Self>,
247 ) -> ChildAction {
248 error!(
249 error = %error,
250 "Child actor fault in manual distribution"
251 );
252 emit_fail(ctx, error).await;
253 ChildAction::Stop
254 }
255}