1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::collections::HashSet;
13use std::sync::Arc;
14use tracing::{Span, debug, error, info, info_span, warn};
15
16use crate::helpers::network::service::NetworkSender;
17use crate::model::common::node::get_subject_data;
18use crate::model::common::subject::{
19 get_gov, get_gov_sn, get_tracker_sn_owner,
20};
21use crate::node::SubjectData;
22use crate::update::UpdateType;
23use crate::{
24 db::Storable,
25 governance::model::WitnessesData,
26 model::common::emit_fail,
27 update::{Update, UpdateMessage, UpdateNew, UpdateSubjectKind},
28};
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct Auth {
32 #[serde(skip)]
33 network: Option<Arc<NetworkSender>>,
34
35 #[serde(skip)]
36 our_key: Arc<PublicKey>,
37
38 #[serde(skip)]
39 round_retry_interval_secs: u64,
40
41 #[serde(skip)]
42 max_round_retries: usize,
43
44 #[serde(skip)]
45 witness_retry_count: usize,
46
47 #[serde(skip)]
48 witness_retry_interval_secs: u64,
49
50 auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
51}
52
53#[derive(Clone, Debug)]
54pub struct AuthInitParams {
55 pub network: Arc<NetworkSender>,
56 pub our_key: Arc<PublicKey>,
57 pub round_retry_interval_secs: u64,
58 pub max_round_retries: usize,
59 pub witness_retry_count: usize,
60 pub witness_retry_interval_secs: u64,
61}
62
63#[derive(
64 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
65)]
66pub enum AuthWitness {
67 One(PublicKey),
68 Many(Vec<PublicKey>),
69 None,
70}
71
72impl BorshSerialize for Auth {
73 fn serialize<W: std::io::Write>(
74 &self,
75 writer: &mut W,
76 ) -> std::io::Result<()> {
77 BorshSerialize::serialize(&self.auth, writer)?;
79
80 Ok(())
81 }
82}
83
84impl BorshDeserialize for Auth {
85 fn deserialize_reader<R: std::io::Read>(
86 reader: &mut R,
87 ) -> std::io::Result<Self> {
88 let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
90 let network = None;
91 let our_key = Arc::new(PublicKey::default());
92
93 Ok(Self {
94 network,
95 auth,
96 our_key,
97 round_retry_interval_secs: 10,
98 max_round_retries: 3,
99 witness_retry_count: 3,
100 witness_retry_interval_secs: 10,
101 })
102 }
103}
104
105impl Auth {
106 async fn build_update_state(
107 ctx: &mut ActorContext<Self>,
108 subject_id: &DigestIdentifier,
109 ) -> Result<(Option<u64>, Option<UpdateSubjectKind>), ActorError> {
110 let data = get_subject_data(ctx, subject_id).await?;
111
112 match data {
113 Some(SubjectData::Tracker { governance_id, .. }) => {
114 let actual_sn =
115 get_tracker_sn_owner(ctx, &governance_id, subject_id)
116 .await?
117 .map(|(_, actual_sn)| actual_sn);
118
119 Ok((actual_sn, Some(UpdateSubjectKind::Tracker)))
120 }
121 Some(SubjectData::Governance { .. }) => {
122 let sn = get_gov_sn(ctx, subject_id).await?;
123 Ok((Some(sn), Some(UpdateSubjectKind::Governance)))
124 }
125 None => Ok((None, None)),
126 }
127 }
128
129 async fn build_update_data(
130 ctx: &mut ActorContext<Self>,
131 subject_id: &DigestIdentifier,
132 ) -> Result<
133 (HashSet<PublicKey>, Option<u64>, Option<UpdateSubjectKind>),
134 ActorError,
135 > {
136 let data = get_subject_data(ctx, subject_id).await?;
137
138 let (witnesses, actual_sn, subject_kind_hint) = if let Some(data) =
139 &data
140 {
141 match data {
142 SubjectData::Tracker {
143 governance_id,
144 schema_id,
145 namespace,
146 ..
147 } => {
148 if let Some((owner, actual_sn)) =
149 get_tracker_sn_owner(ctx, governance_id, subject_id)
150 .await?
151 {
152 let gov = get_gov(ctx, governance_id).await?;
153 let witnesses = gov
154 .get_witnesses(WitnessesData::Schema {
155 creator: owner,
156 schema_id: schema_id.clone(),
157 namespace: Namespace::from(
158 namespace.to_owned(),
159 ),
160 })
161 .map_err(|e| {
162 error!(
163 subject_id = %subject_id,
164 governance_id = %governance_id,
165 error = %e,
166 "Failed to get witnesses for tracker schema"
167 );
168 ActorError::Functional {
169 description: e.to_string(),
170 }
171 })?;
172
173 (
174 witnesses,
175 Some(actual_sn),
176 Some(UpdateSubjectKind::Tracker),
177 )
178 } else {
179 (
180 HashSet::default(),
181 None,
182 Some(UpdateSubjectKind::Tracker),
183 )
184 }
185 }
186 SubjectData::Governance { .. } => {
187 let gov = get_gov(ctx, subject_id).await?;
188 let witnesses =
189 gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
190 warn!(
191 subject_id = %subject_id,
192 error = %e,
193 "Failed to get witnesses for governance"
194 );
195 ActorError::Functional {
196 description: e.to_string(),
197 }
198 })?;
199
200 let sn = get_gov_sn(ctx, subject_id).await?;
201
202 (witnesses, Some(sn), Some(UpdateSubjectKind::Governance))
203 }
204 }
205 } else {
206 (HashSet::default(), None, None)
207 };
208
209 Ok((witnesses, actual_sn, subject_kind_hint))
210 }
211}
212
213#[derive(Debug, Clone)]
214pub enum AuthMessage {
215 NewAuth {
216 subject_id: DigestIdentifier,
217 witness: AuthWitness,
218 },
219 GetAuths,
220 GetAuth {
221 subject_id: DigestIdentifier,
222 },
223 DeleteAuth {
224 subject_id: DigestIdentifier,
225 },
226 Update {
227 subject_id: DigestIdentifier,
228 objective: Option<PublicKey>,
229 strict: bool,
230 },
231}
232
233impl Message for AuthMessage {}
234
235#[derive(Debug, Clone)]
236pub enum AuthResponse {
237 Auths { subjects: Vec<DigestIdentifier> },
238 Witnesses(HashSet<PublicKey>),
239 None,
240}
241
242impl Response for AuthResponse {}
243
244#[derive(
245 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
246)]
247pub enum AuthEvent {
248 NewAuth {
249 subject_id: DigestIdentifier,
250 witness: AuthWitness,
251 },
252 DeleteAuth {
253 subject_id: DigestIdentifier,
254 },
255}
256
257impl Event for AuthEvent {}
258
259#[async_trait]
260impl Actor for Auth {
261 type Event = AuthEvent;
262 type Message = AuthMessage;
263 type Response = AuthResponse;
264
265 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
266 parent_span.map_or_else(
267 || info_span!("Auth"),
268 |parent_span| info_span!(parent: parent_span, "Auth"),
269 )
270 }
271
272 async fn pre_start(
273 &mut self,
274 ctx: &mut ActorContext<Self>,
275 ) -> Result<(), ActorError> {
276 if let Err(e) = self.init_store("auth", None, false, ctx).await {
277 error!(
278 error = %e,
279 "Failed to initialize auth store"
280 );
281 return Err(e);
282 }
283
284 Ok(())
285 }
286}
287
288#[async_trait]
289impl Handler<Self> for Auth {
290 async fn handle_message(
291 &mut self,
292 _sender: ActorPath,
293 msg: AuthMessage,
294 ctx: &mut ave_actors::ActorContext<Self>,
295 ) -> Result<AuthResponse, ActorError> {
296 match msg {
297 AuthMessage::GetAuth { subject_id } => {
298 if let Some(witnesses) = self.auth.get(&subject_id) {
299 debug!(
300 msg_type = "GetAuth",
301 subject_id = %subject_id,
302 "Retrieved auth witnesses"
303 );
304
305 return Ok(AuthResponse::Witnesses(witnesses.clone()));
306 } else {
307 debug!(
308 msg_type = "GetAuth",
309 subject_id = %subject_id,
310 "Subject is not authorized"
311 );
312 return Err(ActorError::Functional {
313 description: "The subject has not been authorized"
314 .to_owned(),
315 });
316 }
317 }
318 AuthMessage::DeleteAuth { subject_id } => {
319 self.on_event(
320 AuthEvent::DeleteAuth {
321 subject_id: subject_id.clone(),
322 },
323 ctx,
324 )
325 .await;
326
327 debug!(
328 msg_type = "DeleteAuth",
329 subject_id = %subject_id,
330 "Auth deleted successfully"
331 );
332 }
333 AuthMessage::NewAuth {
334 subject_id,
335 witness,
336 } => {
337 if !subject_id.is_empty() {
338 self.on_event(
339 AuthEvent::NewAuth {
340 subject_id: subject_id.clone(),
341 witness,
342 },
343 ctx,
344 )
345 .await;
346
347 debug!(
348 msg_type = "NewAuth",
349 subject_id = %subject_id,
350 "New auth created successfully"
351 );
352 } else {
353 warn!(
354 msg_type = "NewAuth",
355 witness = ?witness,
356 "Ignoring auth creation with empty subject_id"
357 );
358 }
359 }
360 AuthMessage::GetAuths => {
361 let subjects: Vec<DigestIdentifier> =
362 self.auth.keys().cloned().collect();
363 debug!(
364 msg_type = "GetAuths",
365 count = subjects.len(),
366 "Retrieved all authorized subjects"
367 );
368 return Ok(AuthResponse::Auths { subjects });
369 }
370 AuthMessage::Update {
371 subject_id,
372 objective,
373 strict,
374 } => {
375 let Some(network) = self.network.clone() else {
376 error!(
377 msg_type = "Update",
378 subject_id = %subject_id,
379 "Network is none"
380 );
381 return Err(ActorError::FunctionalCritical {
382 description: "network is none".to_string(),
383 });
384 };
385
386 let (witnesses, actual_sn, subject_kind_hint) = {
387 let auth_witnesses =
388 self.auth.get(&subject_id).cloned().unwrap_or_default();
389 let (mut witnesses, actual_sn, subject_kind_hint) =
390 if strict {
391 let (actual_sn, subject_kind_hint) =
392 Self::build_update_state(ctx, &subject_id)
393 .await?;
394 (auth_witnesses, actual_sn, subject_kind_hint)
395 } else {
396 let (
397 mut governance_witnesses,
398 actual_sn,
399 subject_kind_hint,
400 ) = Self::build_update_data(ctx, &subject_id)
401 .await?;
402
403 if let Some(witness) = objective {
404 governance_witnesses.insert(witness);
405 }
406
407 (
408 governance_witnesses
409 .union(&auth_witnesses)
410 .cloned()
411 .collect::<HashSet<PublicKey>>(),
412 actual_sn,
413 subject_kind_hint,
414 )
415 };
416 witnesses.remove(&self.our_key);
417
418 (witnesses, actual_sn, subject_kind_hint)
419 };
420
421 if witnesses.is_empty() {
422 warn!(
423 msg_type = "Update",
424 subject_id = %subject_id,
425 "Subject has no witnesses to ask for update"
426 );
427 return Err(ActorError::Functional {
428 description: "The subject has no witnesses to try to ask for an update".to_owned(),
429 });
430 } else {
431 let data = UpdateNew {
432 network,
433 subject_id: subject_id.clone(),
434 our_sn: actual_sn,
435 witnesses,
436 update_type: UpdateType::Auth,
437 subject_kind_hint,
438 round_retry_interval_secs: self
439 .round_retry_interval_secs,
440 max_round_retries: self.max_round_retries,
441 witness_retry_count: self.witness_retry_count,
442 witness_retry_interval_secs: self
443 .witness_retry_interval_secs,
444 };
445
446 let updater = Update::new(data);
447 if let Ok(child) =
448 ctx.create_child(&subject_id.to_string(), updater).await
449 {
450 if let Err(e) = child.tell(UpdateMessage::Run).await {
451 error!(
452 msg_type = "Update",
453 subject_id = %subject_id,
454 error = %e,
455 "Failed to send Run message to update actor"
456 );
457 return Err(emit_fail(ctx, e).await);
458 }
459
460 debug!(
461 msg_type = "Update",
462 subject_id = %subject_id,
463 "Update process initiated with multiple witnesses"
464 );
465 } else {
466 info!(
467 msg_type = "Update",
468 subject_id = %subject_id,
469 "An update is already in progress."
470 );
471 };
472 }
473 }
474 };
475
476 Ok(AuthResponse::None)
477 }
478
479 async fn on_event(
480 &mut self,
481 event: AuthEvent,
482 ctx: &mut ActorContext<Self>,
483 ) {
484 if let Err(e) = self.persist(&event, ctx).await {
485 error!(
486 event = ?event,
487 error = %e,
488 "Failed to persist auth event"
489 );
490 emit_fail(ctx, e).await;
491 }
492 }
493
494 async fn on_child_fault(
495 &mut self,
496 error: ActorError,
497 ctx: &mut ActorContext<Self>,
498 ) -> ChildAction {
499 error!(
500 error = %error,
501 "Child actor fault in auth"
502 );
503 emit_fail(ctx, error).await;
504 ChildAction::Stop
505 }
506}
507
508#[async_trait]
509impl PersistentActor for Auth {
510 type Persistence = LightPersistence;
511 type InitParams = AuthInitParams;
512
513 fn update(&mut self, state: Self) {
514 self.auth = state.auth;
515 }
516
517 fn create_initial(params: Self::InitParams) -> Self {
518 Self {
519 network: Some(params.network),
520 auth: HashMap::new(),
521 our_key: params.our_key,
522 round_retry_interval_secs: params.round_retry_interval_secs,
523 max_round_retries: params.max_round_retries,
524 witness_retry_count: params.witness_retry_count,
525 witness_retry_interval_secs: params.witness_retry_interval_secs,
526 }
527 }
528
529 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
531 match event {
532 AuthEvent::NewAuth {
533 subject_id,
534 witness,
535 } => {
536 let witnesses = match witness {
537 AuthWitness::One(public_key) => {
538 HashSet::from([public_key.clone()])
539 }
540 AuthWitness::Many(items) => items.iter().cloned().collect(),
541 AuthWitness::None => HashSet::default(),
542 };
543
544 self.auth.insert(subject_id.clone(), witnesses);
545 debug!(
546 event_type = "NewAuth",
547 subject_id = %subject_id,
548 "Applied new auth"
549 );
550 }
551 AuthEvent::DeleteAuth { subject_id } => {
552 self.auth.remove(subject_id);
553 debug!(
554 event_type = "DeleteAuth",
555 subject_id = %subject_id,
556 "Applied auth deletion"
557 );
558 }
559 };
560
561 Ok(())
562 }
563}
564
565#[async_trait]
566impl Storable for Auth {}