ave_core/manual_distribution/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 Namespace,
10 identity::{DigestIdentifier, PublicKey},
11};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span, warn};
14
15use crate::{
16 distribution::{Distribution, DistributionMessage, DistributionType},
17 governance::model::WitnessesData,
18 helpers::network::service::NetworkSender,
19 model::common::{
20 emit_fail,
21 node::i_can_send_last_ledger,
22 subject::{acquire_subject, get_gov, get_last_ledger_event},
23 },
24};
25
26pub struct ManualDistribution {
27 our_key: Arc<PublicKey>,
28}
29
30impl ManualDistribution {
31 pub const fn new(our_key: Arc<PublicKey>) -> Self {
32 Self { our_key }
33 }
34}
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub enum ManualDistributionMessage {
38 Update(DigestIdentifier),
39}
40
41impl Message for ManualDistributionMessage {}
42
43impl NotPersistentActor for ManualDistribution {}
44
45#[async_trait]
46impl Actor for ManualDistribution {
47 type Message = ManualDistributionMessage;
48 type Event = ();
49 type Response = ();
50
51 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
52 parent_span.map_or_else(
53 || info_span!("ManualDistribution"),
54 |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
55 )
56 }
57}
58
59#[async_trait]
60impl Handler<Self> for ManualDistribution {
61 async fn handle_message(
62 &mut self,
63 _sender: ActorPath,
64 msg: ManualDistributionMessage,
65 ctx: &mut ave_actors::ActorContext<Self>,
66 ) -> Result<(), ActorError> {
67 match msg {
68 ManualDistributionMessage::Update(subject_id) => {
69 let data = i_can_send_last_ledger(ctx, &subject_id)
70 .await
71 .map_err(|e| {
72 error!(
73 msg_type = "Update",
74 subject_id = %subject_id,
75 error = %e,
76 "Failed to check if we can send last ledger"
77 );
78 e
79 })?;
80
81 let Some(data) = data else {
82 warn!(
83 msg_type = "Update",
84 subject_id = %subject_id,
85 "Not the owner of the subject nor rejected transfer"
86 );
87 return Err(ActorError::Functional {
88 description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
89 });
90 };
91
92 let is_tracker = data.get_governance_id().is_some();
93 let ledger = if is_tracker {
94 let lease = acquire_subject(
95 ctx,
96 &subject_id,
97 format!("manual_distribution:{}", subject_id),
98 None,
99 true,
100 )
101 .await?;
102 let ledger = get_last_ledger_event(ctx, &subject_id).await;
103 lease.finish(ctx).await?;
104 ledger
105 } else {
106 get_last_ledger_event(ctx, &subject_id).await
107 }
108 .map_err(|e| {
109 error!(
110 msg_type = "Update",
111 subject_id = %subject_id,
112 error = %e,
113 "Failed to get last ledger event"
114 );
115 e
116 })?;
117
118 let Some(ledger) = ledger else {
119 error!(
120 msg_type = "Update",
121 subject_id = %subject_id,
122 "No ledger event found for subject"
123 );
124 return Err(ActorError::Functional {
125 description: "Cannot obtain last ledger event"
126 .to_string(),
127 });
128 };
129
130 let governance_id =
131 data.get_governance_id().as_ref().map_or_else(
132 || subject_id.clone(),
133 |governance_id| governance_id.clone(),
134 );
135
136 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
137 error!(
138 msg_type = "Update",
139 subject_id = %subject_id,
140 governance_id = %governance_id,
141 error = %e,
142 "Failed to get governance"
143 );
144 e
145 })?;
146
147 let schema_id = data.get_schema_id();
148
149 let is_gov = schema_id.is_gov();
150 let witnesses_data = if is_gov {
151 WitnessesData::Gov
152 } else {
153 WitnessesData::Schema {
154 creator: (*self.our_key).clone(),
155 schema_id: schema_id.clone(),
156 namespace: Namespace::from(data.get_namespace()),
157 }
158 };
159
160 let mut witnesses =
161 gov.get_witnesses(witnesses_data).map_err(|e| {
162 error!(
163 msg_type = "Update",
164 subject_id = %subject_id,
165 is_gov = is_gov,
166 error = %e,
167 "Failed to get witnesses from governance"
168 );
169 ActorError::Functional {
170 description: e.to_string(),
171 }
172 })?;
173
174 witnesses.remove(&*self.our_key);
175 if witnesses.is_empty() {
176 warn!(
177 msg_type = "Update",
178 subject_id = %subject_id,
179 "No witnesses available for manual distribution"
180 );
181 return Err(ActorError::Functional {
182 description: "No witnesses available to manually send the last ledger event".to_string()
183 });
184 }
185
186 let witnesses_count = witnesses.len();
187
188 let Some(network) = ctx
189 .system()
190 .get_helper::<Arc<NetworkSender>>("network")
191 .await
192 else {
193 error!(
194 msg_type = "Update",
195 subject_id = %subject_id,
196 "Network helper not found"
197 );
198 return Err(ActorError::Helper {
199 name: "network".to_owned(),
200 reason: "Not found".to_owned(),
201 });
202 };
203
204 let distribution = Distribution::new(
205 network,
206 DistributionType::Manual,
207 DigestIdentifier::default(),
208 );
209
210 let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
211 warn!(
212 msg_type = "Update",
213 subject_id = %subject_id,
214 error = %e,
215 "Manual distribution already in progress"
216 );
217 ActorError::Functional {
218 description: "Manual distribution already in progress for this subject".to_owned()
219 }
220 })?;
221
222 if let Err(e) = distribution_actor
223 .tell(DistributionMessage::Create {
224 witnesses: witnesses.clone(),
225 ledger: Box::new(ledger),
226 })
227 .await
228 {
229 error!(
230 msg_type = "Update",
231 subject_id = %subject_id,
232 witnesses_count = witnesses_count,
233 error = %e,
234 "Failed to start manual distribution"
235 );
236 return Err(ActorError::Functional {
237 description: format!(
238 "Failed to start manual distribution: {}",
239 e
240 ),
241 });
242 };
243
244 debug!(
245 msg_type = "Update",
246 subject_id = %subject_id,
247 witnesses_count = witnesses_count,
248 is_gov = is_gov,
249 "Manual distribution started successfully"
250 );
251
252 Ok(())
253 }
254 }
255 }
256
257 async fn on_child_fault(
258 &mut self,
259 error: ActorError,
260 ctx: &mut ActorContext<Self>,
261 ) -> ChildAction {
262 error!(
263 error = %error,
264 "Child actor fault in manual distribution"
265 );
266 emit_fail(ctx, error).await;
267 ChildAction::Stop
268 }
269}