1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::collections::HashSet;
13use std::sync::Arc;
14use tracing::{Span, debug, error, info, info_span, warn};
15
16use crate::helpers::network::service::NetworkSender;
17use crate::model::common::node::get_subject_data;
18use crate::model::common::subject::{
19 get_gov, get_gov_sn, get_tracker_sn_owner,
20};
21use crate::node::SubjectData;
22use crate::update::UpdateType;
23use crate::{
24 db::Storable,
25 governance::model::WitnessesData,
26 model::common::emit_fail,
27 update::{Update, UpdateMessage, UpdateNew, UpdateSubjectKind},
28};
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct Auth {
32 #[serde(skip)]
33 network: Option<Arc<NetworkSender>>,
34
35 #[serde(skip)]
36 our_key: Arc<PublicKey>,
37
38 #[serde(skip)]
39 round_retry_interval_secs: u64,
40
41 #[serde(skip)]
42 max_round_retries: usize,
43
44 #[serde(skip)]
45 witness_retry_count: usize,
46
47 #[serde(skip)]
48 witness_retry_interval_secs: u64,
49
50 auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
51}
52
53#[derive(Clone, Debug)]
54pub struct AuthInitParams {
55 pub network: Arc<NetworkSender>,
56 pub our_key: Arc<PublicKey>,
57 pub round_retry_interval_secs: u64,
58 pub max_round_retries: usize,
59 pub witness_retry_count: usize,
60 pub witness_retry_interval_secs: u64,
61}
62
63#[derive(
64 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
65)]
66pub enum AuthWitness {
67 One(PublicKey),
68 Many(Vec<PublicKey>),
69 None,
70}
71
72impl BorshSerialize for Auth {
73 fn serialize<W: std::io::Write>(
74 &self,
75 writer: &mut W,
76 ) -> std::io::Result<()> {
77 BorshSerialize::serialize(&self.auth, writer)?;
79
80 Ok(())
81 }
82}
83
84impl BorshDeserialize for Auth {
85 fn deserialize_reader<R: std::io::Read>(
86 reader: &mut R,
87 ) -> std::io::Result<Self> {
88 let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
90 let network = None;
91 let our_key = Arc::new(PublicKey::default());
92
93 Ok(Self {
94 network,
95 auth,
96 our_key,
97 round_retry_interval_secs: 10,
98 max_round_retries: 3,
99 witness_retry_count: 3,
100 witness_retry_interval_secs: 10,
101 })
102 }
103}
104
105impl Auth {
106 async fn build_update_state(
107 ctx: &mut ActorContext<Self>,
108 subject_id: &DigestIdentifier,
109 ) -> Result<(Option<u64>, Option<UpdateSubjectKind>), ActorError> {
110 let data = get_subject_data(ctx, subject_id).await?;
111
112 match data {
113 Some(SubjectData::Tracker { governance_id, .. }) => {
114 let actual_sn = get_tracker_sn_owner(
115 ctx,
116 &governance_id,
117 subject_id,
118 )
119 .await?
120 .map(|(_, actual_sn)| actual_sn);
121
122 Ok((actual_sn, Some(UpdateSubjectKind::Tracker)))
123 }
124 Some(SubjectData::Governance { .. }) => {
125 let sn = get_gov_sn(ctx, subject_id).await?;
126 Ok((Some(sn), Some(UpdateSubjectKind::Governance)))
127 }
128 None => Ok((None, None)),
129 }
130 }
131
132 async fn build_update_data(
133 ctx: &mut ActorContext<Self>,
134 subject_id: &DigestIdentifier,
135 ) -> Result<
136 (HashSet<PublicKey>, Option<u64>, Option<UpdateSubjectKind>),
137 ActorError,
138 > {
139 let data = get_subject_data(ctx, subject_id).await?;
140
141 let (witnesses, actual_sn, subject_kind_hint) = if let Some(data) =
142 &data
143 {
144 match data {
145 SubjectData::Tracker {
146 governance_id,
147 schema_id,
148 namespace,
149 ..
150 } => {
151 if let Some((owner, actual_sn)) =
152 get_tracker_sn_owner(ctx, governance_id, subject_id)
153 .await?
154 {
155 let gov = get_gov(ctx, governance_id).await?;
156 let witnesses = gov
157 .get_witnesses(WitnessesData::Schema {
158 creator: owner,
159 schema_id: schema_id.clone(),
160 namespace: Namespace::from(
161 namespace.to_owned(),
162 ),
163 })
164 .map_err(|e| {
165 error!(
166 subject_id = %subject_id,
167 governance_id = %governance_id,
168 error = %e,
169 "Failed to get witnesses for tracker schema"
170 );
171 ActorError::Functional {
172 description: e.to_string(),
173 }
174 })?;
175
176 (
177 witnesses,
178 Some(actual_sn),
179 Some(UpdateSubjectKind::Tracker),
180 )
181 } else {
182 (
183 HashSet::default(),
184 None,
185 Some(UpdateSubjectKind::Tracker),
186 )
187 }
188 }
189 SubjectData::Governance { .. } => {
190 let gov = get_gov(ctx, subject_id).await?;
191 let witnesses =
192 gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
193 warn!(
194 subject_id = %subject_id,
195 error = %e,
196 "Failed to get witnesses for governance"
197 );
198 ActorError::Functional {
199 description: e.to_string(),
200 }
201 })?;
202
203 let sn = get_gov_sn(ctx, subject_id).await?;
204
205 (witnesses, Some(sn), Some(UpdateSubjectKind::Governance))
206 }
207 }
208 } else {
209 (HashSet::default(), None, None)
210 };
211
212 Ok((witnesses, actual_sn, subject_kind_hint))
213 }
214}
215
216#[derive(Debug, Clone)]
217pub enum AuthMessage {
218 NewAuth {
219 subject_id: DigestIdentifier,
220 witness: AuthWitness,
221 },
222 GetAuths,
223 GetAuth {
224 subject_id: DigestIdentifier,
225 },
226 DeleteAuth {
227 subject_id: DigestIdentifier,
228 },
229 Update {
230 subject_id: DigestIdentifier,
231 objective: Option<PublicKey>,
232 strict: bool,
233 },
234}
235
236impl Message for AuthMessage {}
237
238#[derive(Debug, Clone)]
239pub enum AuthResponse {
240 Auths { subjects: Vec<DigestIdentifier> },
241 Witnesses(HashSet<PublicKey>),
242 None,
243}
244
245impl Response for AuthResponse {}
246
247#[derive(
248 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
249)]
250pub enum AuthEvent {
251 NewAuth {
252 subject_id: DigestIdentifier,
253 witness: AuthWitness,
254 },
255 DeleteAuth {
256 subject_id: DigestIdentifier,
257 },
258}
259
260impl Event for AuthEvent {}
261
262#[async_trait]
263impl Actor for Auth {
264 type Event = AuthEvent;
265 type Message = AuthMessage;
266 type Response = AuthResponse;
267
268 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
269 parent_span.map_or_else(
270 || info_span!("Auth"),
271 |parent_span| info_span!(parent: parent_span, "Auth"),
272 )
273 }
274
275 async fn pre_start(
276 &mut self,
277 ctx: &mut ActorContext<Self>,
278 ) -> Result<(), ActorError> {
279 if let Err(e) = self.init_store("auth", None, false, ctx).await {
280 error!(
281 error = %e,
282 "Failed to initialize auth store"
283 );
284 return Err(e);
285 }
286
287 Ok(())
288 }
289}
290
291#[async_trait]
292impl Handler<Self> for Auth {
293 async fn handle_message(
294 &mut self,
295 _sender: ActorPath,
296 msg: AuthMessage,
297 ctx: &mut ave_actors::ActorContext<Self>,
298 ) -> Result<AuthResponse, ActorError> {
299 match msg {
300 AuthMessage::GetAuth { subject_id } => {
301 if let Some(witnesses) = self.auth.get(&subject_id) {
302 debug!(
303 msg_type = "GetAuth",
304 subject_id = %subject_id,
305 "Retrieved auth witnesses"
306 );
307
308 return Ok(AuthResponse::Witnesses(witnesses.clone()));
309 } else {
310 debug!(
311 msg_type = "GetAuth",
312 subject_id = %subject_id,
313 "Subject is not authorized"
314 );
315 return Err(ActorError::Functional {
316 description: "The subject has not been authorized"
317 .to_owned(),
318 });
319 }
320 }
321 AuthMessage::DeleteAuth { subject_id } => {
322 self.on_event(
323 AuthEvent::DeleteAuth {
324 subject_id: subject_id.clone(),
325 },
326 ctx,
327 )
328 .await;
329
330 debug!(
331 msg_type = "DeleteAuth",
332 subject_id = %subject_id,
333 "Auth deleted successfully"
334 );
335 }
336 AuthMessage::NewAuth {
337 subject_id,
338 witness,
339 } => {
340 if !subject_id.is_empty() {
341 self.on_event(
342 AuthEvent::NewAuth {
343 subject_id: subject_id.clone(),
344 witness,
345 },
346 ctx,
347 )
348 .await;
349
350 debug!(
351 msg_type = "NewAuth",
352 subject_id = %subject_id,
353 "New auth created successfully"
354 );
355 } else {
356 warn!(
357 msg_type = "NewAuth",
358 witness = ?witness,
359 "Ignoring auth creation with empty subject_id"
360 );
361 }
362 }
363 AuthMessage::GetAuths => {
364 let subjects: Vec<DigestIdentifier> =
365 self.auth.keys().cloned().collect();
366 debug!(
367 msg_type = "GetAuths",
368 count = subjects.len(),
369 "Retrieved all authorized subjects"
370 );
371 return Ok(AuthResponse::Auths { subjects });
372 }
373 AuthMessage::Update {
374 subject_id,
375 objective,
376 strict,
377 } => {
378 let Some(network) = self.network.clone() else {
379 error!(
380 msg_type = "Update",
381 subject_id = %subject_id,
382 "Network is none"
383 );
384 return Err(ActorError::FunctionalCritical {
385 description: "network is none".to_string(),
386 });
387 };
388
389 let (witnesses, actual_sn, subject_kind_hint) = {
390 let auth_witnesses =
391 self.auth.get(&subject_id).cloned().unwrap_or_default();
392 let (mut witnesses, actual_sn, subject_kind_hint) =
393 if strict {
394 let (actual_sn, subject_kind_hint) =
395 Self::build_update_state(ctx, &subject_id)
396 .await?;
397 (auth_witnesses, actual_sn, subject_kind_hint)
398 } else {
399 let (
400 mut governance_witnesses,
401 actual_sn,
402 subject_kind_hint,
403 ) = Self::build_update_data(ctx, &subject_id)
404 .await?;
405
406 if let Some(witness) = objective {
407 governance_witnesses.insert(witness);
408 }
409
410 (
411 governance_witnesses
412 .union(&auth_witnesses)
413 .cloned()
414 .collect::<HashSet<PublicKey>>(),
415 actual_sn,
416 subject_kind_hint,
417 )
418 };
419 witnesses.remove(&self.our_key);
420
421 (witnesses, actual_sn, subject_kind_hint)
422 };
423
424 if witnesses.is_empty() {
425 warn!(
426 msg_type = "Update",
427 subject_id = %subject_id,
428 "Subject has no witnesses to ask for update"
429 );
430 return Err(ActorError::Functional {
431 description: "The subject has no witnesses to try to ask for an update".to_owned(),
432 });
433 } else {
434 let data = UpdateNew {
435 network,
436 subject_id: subject_id.clone(),
437 our_sn: actual_sn,
438 witnesses,
439 update_type: UpdateType::Auth,
440 subject_kind_hint,
441 round_retry_interval_secs: self
442 .round_retry_interval_secs,
443 max_round_retries: self.max_round_retries,
444 witness_retry_count: self.witness_retry_count,
445 witness_retry_interval_secs: self
446 .witness_retry_interval_secs,
447 };
448
449 let updater = Update::new(data);
450 if let Ok(child) =
451 ctx.create_child(&subject_id.to_string(), updater).await
452 {
453 if let Err(e) = child.tell(UpdateMessage::Run).await {
454 error!(
455 msg_type = "Update",
456 subject_id = %subject_id,
457 error = %e,
458 "Failed to send Run message to update actor"
459 );
460 return Err(emit_fail(ctx, e).await);
461 }
462
463 debug!(
464 msg_type = "Update",
465 subject_id = %subject_id,
466 "Update process initiated with multiple witnesses"
467 );
468 } else {
469 info!(
470 msg_type = "Update",
471 subject_id = %subject_id,
472 "An update is already in progress."
473 );
474 };
475 }
476 }
477 };
478
479 Ok(AuthResponse::None)
480 }
481
482 async fn on_event(
483 &mut self,
484 event: AuthEvent,
485 ctx: &mut ActorContext<Self>,
486 ) {
487 if let Err(e) = self.persist(&event, ctx).await {
488 error!(
489 event = ?event,
490 error = %e,
491 "Failed to persist auth event"
492 );
493 emit_fail(ctx, e).await;
494 }
495 }
496
497 async fn on_child_fault(
498 &mut self,
499 error: ActorError,
500 ctx: &mut ActorContext<Self>,
501 ) -> ChildAction {
502 error!(
503 error = %error,
504 "Child actor fault in auth"
505 );
506 emit_fail(ctx, error).await;
507 ChildAction::Stop
508 }
509}
510
511#[async_trait]
512impl PersistentActor for Auth {
513 type Persistence = LightPersistence;
514 type InitParams = AuthInitParams;
515
516 fn update(&mut self, state: Self) {
517 self.auth = state.auth;
518 }
519
520 fn create_initial(params: Self::InitParams) -> Self {
521 Self {
522 network: Some(params.network),
523 auth: HashMap::new(),
524 our_key: params.our_key,
525 round_retry_interval_secs: params.round_retry_interval_secs,
526 max_round_retries: params.max_round_retries,
527 witness_retry_count: params.witness_retry_count,
528 witness_retry_interval_secs: params.witness_retry_interval_secs,
529 }
530 }
531
532 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
534 match event {
535 AuthEvent::NewAuth {
536 subject_id,
537 witness,
538 } => {
539 let witnesses = match witness {
540 AuthWitness::One(public_key) => {
541 HashSet::from([public_key.clone()])
542 }
543 AuthWitness::Many(items) => items.iter().cloned().collect(),
544 AuthWitness::None => HashSet::default(),
545 };
546
547 self.auth.insert(subject_id.clone(), witnesses);
548 debug!(
549 event_type = "NewAuth",
550 subject_id = %subject_id,
551 "Applied new auth"
552 );
553 }
554 AuthEvent::DeleteAuth { subject_id } => {
555 self.auth.remove(subject_id);
556 debug!(
557 event_type = "DeleteAuth",
558 subject_id = %subject_id,
559 "Applied auth deletion"
560 );
561 }
562 };
563
564 Ok(())
565 }
566}
567
568#[async_trait]
569impl Storable for Auth {}