1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use network::ComunicateInfo;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::collections::HashSet;
14use std::sync::Arc;
15use tracing::{Span, debug, error, info, info_span, warn};
16
17use crate::helpers::network::service::NetworkSender;
18use crate::model::common::node::get_subject_data;
19use crate::model::common::subject::{
20 get_gov, get_gov_sn, get_tracker_sn_creator,
21};
22use crate::node::SubjectData;
23use crate::update::UpdateType;
24use crate::{
25 ActorMessage, NetworkMessage,
26 db::Storable,
27 governance::model::WitnessesData,
28 model::common::emit_fail,
29 update::{Update, UpdateMessage, UpdateNew},
30};
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct Auth {
34 #[serde(skip)]
35 network: Option<Arc<NetworkSender>>,
36
37 #[serde(skip)]
38 our_key: Arc<PublicKey>,
39
40 auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
41}
42
43#[derive(
44 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
45)]
46pub enum AuthWitness {
47 One(PublicKey),
48 Many(Vec<PublicKey>),
49 None,
50}
51
52impl BorshSerialize for Auth {
53 fn serialize<W: std::io::Write>(
54 &self,
55 writer: &mut W,
56 ) -> std::io::Result<()> {
57 BorshSerialize::serialize(&self.auth, writer)?;
59
60 Ok(())
61 }
62}
63
64impl BorshDeserialize for Auth {
65 fn deserialize_reader<R: std::io::Read>(
66 reader: &mut R,
67 ) -> std::io::Result<Self> {
68 let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
70 let network = None;
71 let our_key = Arc::new(PublicKey::default());
72
73 Ok(Self {
74 network,
75 auth,
76 our_key,
77 })
78 }
79}
80
81impl Auth {
82 async fn build_update_data(
83 ctx: &mut ActorContext<Self>,
84 subject_id: &DigestIdentifier,
85 ) -> Result<(HashSet<PublicKey>, Option<u64>), ActorError> {
86 let data = get_subject_data(ctx, subject_id).await?;
87
88 let (witnesses, actual_sn) = if let Some(data) = &data {
89 match data {
90 SubjectData::Tracker {
91 governance_id,
92 schema_id,
93 namespace,
94 ..
95 } => {
96 if let Some((creator, sn)) =
97 get_tracker_sn_creator(ctx, governance_id, subject_id)
98 .await?
99 {
100 let gov = get_gov(ctx, governance_id).await?;
101 let witnesses = gov
102 .get_witnesses(WitnessesData::Schema {
103 creator,
104 schema_id: schema_id.clone(),
105 namespace: Namespace::from(
106 namespace.to_owned(),
107 ),
108 })
109 .map_err(|e| {
110 error!(
111 subject_id = %subject_id,
112 governance_id = %governance_id,
113 error = %e,
114 "Failed to get witnesses for tracker schema"
115 );
116 ActorError::Functional {
117 description: e.to_string(),
118 }
119 })?;
120
121 (witnesses, Some(sn))
122 } else {
123 (HashSet::default(), None)
124 }
125 }
126 SubjectData::Governance { .. } => {
127 let gov = get_gov(ctx, subject_id).await?;
128 let witnesses =
129 gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
130 warn!(
131 subject_id = %subject_id,
132 error = %e,
133 "Failed to get witnesses for governance"
134 );
135 ActorError::Functional {
136 description: e.to_string(),
137 }
138 })?;
139
140 let sn = get_gov_sn(ctx, subject_id).await?;
141
142 (witnesses, Some(sn))
143 }
144 }
145 } else {
146 (HashSet::default(), None)
147 };
148
149 Ok((witnesses, actual_sn))
150 }
151}
152
153#[derive(Debug, Clone)]
154pub enum AuthMessage {
155 NewAuth {
156 subject_id: DigestIdentifier,
157 witness: AuthWitness,
158 },
159 GetAuths,
160 GetAuth {
161 subject_id: DigestIdentifier,
162 },
163 DeleteAuth {
164 subject_id: DigestIdentifier,
165 },
166 Update {
167 subject_id: DigestIdentifier,
168 objective: Option<PublicKey>,
169 },
170}
171
172impl Message for AuthMessage {}
173
174#[derive(Debug, Clone)]
175pub enum AuthResponse {
176 Auths { subjects: Vec<DigestIdentifier> },
177 Witnesses(HashSet<PublicKey>),
178 None,
179}
180
181impl Response for AuthResponse {}
182
183#[derive(
184 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
185)]
186pub enum AuthEvent {
187 NewAuth {
188 subject_id: DigestIdentifier,
189 witness: AuthWitness,
190 },
191 DeleteAuth {
192 subject_id: DigestIdentifier,
193 },
194}
195
196impl Event for AuthEvent {}
197
198#[async_trait]
199impl Actor for Auth {
200 type Event = AuthEvent;
201 type Message = AuthMessage;
202 type Response = AuthResponse;
203
204 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
205 parent_span.map_or_else(
206 || info_span!("Auth"),
207 |parent_span| info_span!(parent: parent_span, "Auth"),
208 )
209 }
210
211 async fn pre_start(
212 &mut self,
213 ctx: &mut ActorContext<Self>,
214 ) -> Result<(), ActorError> {
215 if let Err(e) = self.init_store("auth", None, false, ctx).await {
216 error!(
217 error = %e,
218 "Failed to initialize auth store"
219 );
220 return Err(e);
221 }
222 Ok(())
223 }
224}
225
226#[async_trait]
227impl Handler<Self> for Auth {
228 async fn handle_message(
229 &mut self,
230 _sender: ActorPath,
231 msg: AuthMessage,
232 ctx: &mut ave_actors::ActorContext<Self>,
233 ) -> Result<AuthResponse, ActorError> {
234 match msg {
235 AuthMessage::GetAuth { subject_id } => {
236 if let Some(witnesses) = self.auth.get(&subject_id) {
237 debug!(
238 msg_type = "GetAuth",
239 subject_id = %subject_id,
240 "Retrieved auth witnesses"
241 );
242
243 return Ok(AuthResponse::Witnesses(witnesses.clone()));
244 } else {
245 return Err(ActorError::Functional {
246 description: "The subject has not been authorized"
247 .to_owned(),
248 });
249 }
250 }
251 AuthMessage::DeleteAuth { subject_id } => {
252 self.on_event(
253 AuthEvent::DeleteAuth {
254 subject_id: subject_id.clone(),
255 },
256 ctx,
257 )
258 .await;
259
260 debug!(
261 msg_type = "DeleteAuth",
262 subject_id = %subject_id,
263 "Auth deleted successfully"
264 );
265 }
266 AuthMessage::NewAuth {
267 subject_id,
268 witness,
269 } => {
270 if !subject_id.is_empty() {
271 self.on_event(
272 AuthEvent::NewAuth {
273 subject_id: subject_id.clone(),
274 witness,
275 },
276 ctx,
277 )
278 .await;
279
280 debug!(
281 msg_type = "NewAuth",
282 subject_id = %subject_id,
283 "New auth created successfully"
284 );
285 }
286 }
287 AuthMessage::GetAuths => {
288 let subjects: Vec<DigestIdentifier> =
289 self.auth.keys().cloned().collect();
290 debug!(
291 msg_type = "GetAuths",
292 count = subjects.len(),
293 "Retrieved all authorized subjects"
294 );
295 return Ok(AuthResponse::Auths { subjects });
296 }
297 AuthMessage::Update {
298 subject_id,
299 objective,
300 } => {
301 let Some(network) = self.network.clone() else {
302 error!(
303 msg_type = "Update",
304 subject_id = %subject_id,
305 "Network is none"
306 );
307 return Err(ActorError::FunctionalCritical {
308 description: "network is none".to_string(),
309 });
310 };
311
312 let (witnesses, actual_sn) = {
313 let (mut witnesses, actual_sn) =
314 Self::build_update_data(ctx, &subject_id).await?;
315
316 if let Some(witness) = objective {
317 witnesses.insert(witness);
318 }
319
320 let auth_witnesses =
321 self.auth.get(&subject_id).cloned().unwrap_or_default();
322
323 let mut witnesses = witnesses
324 .union(&auth_witnesses)
325 .cloned()
326 .collect::<HashSet<PublicKey>>();
327 witnesses.remove(&self.our_key);
328
329 (witnesses, actual_sn)
330 };
331
332 if witnesses.is_empty() {
333 warn!(
334 msg_type = "Update",
335 subject_id = %subject_id,
336 "Subject has no witnesses to ask for update"
337 );
338 return Err(ActorError::Functional {
339 description: "The subject has no witnesses to try to ask for an update".to_owned(),
340 });
341 } else if witnesses.len() == 1 {
342 let objetive = witnesses.iter().next().expect("len is 1");
343 let info = ComunicateInfo {
344 receiver: objetive.clone(),
345 request_id: String::default(),
346 version: 0,
347 receiver_actor: format!(
348 "/user/node/distributor_{}",
349 subject_id
350 ),
351 };
352
353 if let Err(e) = network
354 .send_command(network::CommandHelper::SendMessage {
355 message: NetworkMessage {
356 info,
357 message: ActorMessage::DistributionLedgerReq {
358 actual_sn,
359 subject_id: subject_id.clone(),
360 },
361 },
362 })
363 .await
364 {
365 error!(
366 msg_type = "Update",
367 subject_id = %subject_id,
368 error = %e,
369 "Cannot send response to network"
370 );
371 return Err(emit_fail(ctx, e).await);
372 };
373
374 debug!(
375 msg_type = "Update",
376 subject_id = %subject_id,
377 "Update message sent to single witness"
378 );
379 } else {
380 let data = UpdateNew {
381 network,
382 subject_id: subject_id.clone(),
383 our_sn: actual_sn,
384 witnesses,
385 update_type: UpdateType::Auth,
386 };
387
388 let updater = Update::new(data);
389 if let Ok(child) =
390 ctx.create_child(&subject_id.to_string(), updater).await
391 {
392 if let Err(e) = child.tell(UpdateMessage::Run).await {
393 error!(
394 msg_type = "Update",
395 subject_id = %subject_id,
396 error = %e,
397 "Failed to send Run message to update actor"
398 );
399 return Err(emit_fail(ctx, e).await);
400 }
401
402 debug!(
403 msg_type = "Update",
404 subject_id = %subject_id,
405 "Update process initiated with multiple witnesses"
406 );
407 } else {
408 info!(
409 msg_type = "Update",
410 subject_id = %subject_id,
411 "An update is already in progress."
412 );
413 };
414 }
415 }
416 };
417
418 Ok(AuthResponse::None)
419 }
420
421 async fn on_event(
422 &mut self,
423 event: AuthEvent,
424 ctx: &mut ActorContext<Self>,
425 ) {
426 if let Err(e) = self.persist(&event, ctx).await {
427 error!(
428 event = ?event,
429 error = %e,
430 "Failed to persist auth event"
431 );
432 emit_fail(ctx, e).await;
433 }
434 }
435
436 async fn on_child_fault(
437 &mut self,
438 error: ActorError,
439 ctx: &mut ActorContext<Self>,
440 ) -> ChildAction {
441 error!(
442 error = %error,
443 "Child actor fault in auth"
444 );
445 emit_fail(ctx, error).await;
446 ChildAction::Stop
447 }
448}
449
450#[async_trait]
451impl PersistentActor for Auth {
452 type Persistence = LightPersistence;
453 type InitParams = (Arc<NetworkSender>, Arc<PublicKey>);
454
455 fn update(&mut self, state: Self) {
456 self.auth = state.auth;
457 }
458
459 fn create_initial(params: Self::InitParams) -> Self {
460 Self {
461 network: Some(params.0),
462 auth: HashMap::new(),
463 our_key: params.1,
464 }
465 }
466
467 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
469 match event {
470 AuthEvent::NewAuth {
471 subject_id,
472 witness,
473 } => {
474 let witnesses = match witness {
475 AuthWitness::One(public_key) => {
476 HashSet::from([public_key.clone()])
477 }
478 AuthWitness::Many(items) => items.iter().cloned().collect(),
479 AuthWitness::None => HashSet::default(),
480 };
481
482 self.auth.insert(subject_id.clone(), witnesses);
483 debug!(
484 event_type = "NewAuth",
485 subject_id = %subject_id,
486 "Applied new auth"
487 );
488 }
489 AuthEvent::DeleteAuth { subject_id } => {
490 self.auth.remove(subject_id);
491 debug!(
492 event_type = "DeleteAuth",
493 subject_id = %subject_id,
494 "Applied auth deletion"
495 );
496 }
497 };
498
499 Ok(())
500 }
501}
502
503#[async_trait]
504impl Storable for Auth {}