Skip to main content

ave_core/auth/
mod.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::collections::HashSet;
13use std::sync::Arc;
14use tracing::{Span, debug, error, info, info_span, warn};
15
16use crate::helpers::network::service::NetworkSender;
17use crate::model::common::node::get_subject_data;
18use crate::model::common::subject::{
19    get_gov, get_gov_sn, get_tracker_sn_owner,
20};
21use crate::node::SubjectData;
22use crate::update::UpdateType;
23use crate::{
24    db::Storable,
25    governance::model::WitnessesData,
26    model::common::emit_fail,
27    update::{Update, UpdateMessage, UpdateNew, UpdateSubjectKind},
28};
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct Auth {
32    #[serde(skip)]
33    network: Option<Arc<NetworkSender>>,
34
35    #[serde(skip)]
36    our_key: Arc<PublicKey>,
37
38    #[serde(skip)]
39    round_retry_interval_secs: u64,
40
41    #[serde(skip)]
42    max_round_retries: usize,
43
44    #[serde(skip)]
45    witness_retry_count: usize,
46
47    #[serde(skip)]
48    witness_retry_interval_secs: u64,
49
50    auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
51}
52
53#[derive(Clone, Debug)]
54pub struct AuthInitParams {
55    pub network: Arc<NetworkSender>,
56    pub our_key: Arc<PublicKey>,
57    pub round_retry_interval_secs: u64,
58    pub max_round_retries: usize,
59    pub witness_retry_count: usize,
60    pub witness_retry_interval_secs: u64,
61}
62
63#[derive(
64    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
65)]
66pub enum AuthWitness {
67    One(PublicKey),
68    Many(Vec<PublicKey>),
69    None,
70}
71
72impl BorshSerialize for Auth {
73    fn serialize<W: std::io::Write>(
74        &self,
75        writer: &mut W,
76    ) -> std::io::Result<()> {
77        // Serialize only the fields we want to persist, skipping 'owner'
78        BorshSerialize::serialize(&self.auth, writer)?;
79
80        Ok(())
81    }
82}
83
84impl BorshDeserialize for Auth {
85    fn deserialize_reader<R: std::io::Read>(
86        reader: &mut R,
87    ) -> std::io::Result<Self> {
88        // Deserialize the persisted fields
89        let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
90        let network = None;
91        let our_key = Arc::new(PublicKey::default());
92
93        Ok(Self {
94            network,
95            auth,
96            our_key,
97            round_retry_interval_secs: 10,
98            max_round_retries: 3,
99            witness_retry_count: 3,
100            witness_retry_interval_secs: 10,
101        })
102    }
103}
104
105impl Auth {
106    async fn build_update_state(
107        ctx: &mut ActorContext<Self>,
108        subject_id: &DigestIdentifier,
109    ) -> Result<(Option<u64>, Option<UpdateSubjectKind>), ActorError> {
110        let data = get_subject_data(ctx, subject_id).await?;
111
112        match data {
113            Some(SubjectData::Tracker { governance_id, .. }) => {
114                let actual_sn = get_tracker_sn_owner(
115                    ctx,
116                    &governance_id,
117                    subject_id,
118                )
119                .await?
120                .map(|(_, actual_sn)| actual_sn);
121
122                Ok((actual_sn, Some(UpdateSubjectKind::Tracker)))
123            }
124            Some(SubjectData::Governance { .. }) => {
125                let sn = get_gov_sn(ctx, subject_id).await?;
126                Ok((Some(sn), Some(UpdateSubjectKind::Governance)))
127            }
128            None => Ok((None, None)),
129        }
130    }
131
132    async fn build_update_data(
133        ctx: &mut ActorContext<Self>,
134        subject_id: &DigestIdentifier,
135    ) -> Result<
136        (HashSet<PublicKey>, Option<u64>, Option<UpdateSubjectKind>),
137        ActorError,
138    > {
139        let data = get_subject_data(ctx, subject_id).await?;
140
141        let (witnesses, actual_sn, subject_kind_hint) = if let Some(data) =
142            &data
143        {
144            match data {
145                SubjectData::Tracker {
146                    governance_id,
147                    schema_id,
148                    namespace,
149                    ..
150                } => {
151                    if let Some((owner, actual_sn)) =
152                        get_tracker_sn_owner(ctx, governance_id, subject_id)
153                            .await?
154                    {
155                        let gov = get_gov(ctx, governance_id).await?;
156                        let witnesses = gov
157                            .get_witnesses(WitnessesData::Schema {
158                                creator: owner,
159                                schema_id: schema_id.clone(),
160                                namespace: Namespace::from(
161                                    namespace.to_owned(),
162                                ),
163                            })
164                            .map_err(|e| {
165                                error!(
166                                    subject_id = %subject_id,
167                                    governance_id = %governance_id,
168                                    error = %e,
169                                    "Failed to get witnesses for tracker schema"
170                                );
171                                ActorError::Functional {
172                                    description: e.to_string(),
173                                }
174                            })?;
175
176                        (
177                            witnesses,
178                            Some(actual_sn),
179                            Some(UpdateSubjectKind::Tracker),
180                        )
181                    } else {
182                        (
183                            HashSet::default(),
184                            None,
185                            Some(UpdateSubjectKind::Tracker),
186                        )
187                    }
188                }
189                SubjectData::Governance { .. } => {
190                    let gov = get_gov(ctx, subject_id).await?;
191                    let witnesses =
192                        gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
193                            warn!(
194                                subject_id = %subject_id,
195                                error = %e,
196                                "Failed to get witnesses for governance"
197                            );
198                            ActorError::Functional {
199                                description: e.to_string(),
200                            }
201                        })?;
202
203                    let sn = get_gov_sn(ctx, subject_id).await?;
204
205                    (witnesses, Some(sn), Some(UpdateSubjectKind::Governance))
206                }
207            }
208        } else {
209            (HashSet::default(), None, None)
210        };
211
212        Ok((witnesses, actual_sn, subject_kind_hint))
213    }
214}
215
216#[derive(Debug, Clone)]
217pub enum AuthMessage {
218    NewAuth {
219        subject_id: DigestIdentifier,
220        witness: AuthWitness,
221    },
222    GetAuths,
223    GetAuth {
224        subject_id: DigestIdentifier,
225    },
226    DeleteAuth {
227        subject_id: DigestIdentifier,
228    },
229    Update {
230        subject_id: DigestIdentifier,
231        objective: Option<PublicKey>,
232        strict: bool,
233    },
234}
235
236impl Message for AuthMessage {}
237
238#[derive(Debug, Clone)]
239pub enum AuthResponse {
240    Auths { subjects: Vec<DigestIdentifier> },
241    Witnesses(HashSet<PublicKey>),
242    None,
243}
244
245impl Response for AuthResponse {}
246
247#[derive(
248    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
249)]
250pub enum AuthEvent {
251    NewAuth {
252        subject_id: DigestIdentifier,
253        witness: AuthWitness,
254    },
255    DeleteAuth {
256        subject_id: DigestIdentifier,
257    },
258}
259
260impl Event for AuthEvent {}
261
262#[async_trait]
263impl Actor for Auth {
264    type Event = AuthEvent;
265    type Message = AuthMessage;
266    type Response = AuthResponse;
267
268    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
269        parent_span.map_or_else(
270            || info_span!("Auth"),
271            |parent_span| info_span!(parent: parent_span, "Auth"),
272        )
273    }
274
275    async fn pre_start(
276        &mut self,
277        ctx: &mut ActorContext<Self>,
278    ) -> Result<(), ActorError> {
279        if let Err(e) = self.init_store("auth", None, false, ctx).await {
280            error!(
281                error = %e,
282                "Failed to initialize auth store"
283            );
284            return Err(e);
285        }
286
287        Ok(())
288    }
289}
290
291#[async_trait]
292impl Handler<Self> for Auth {
293    async fn handle_message(
294        &mut self,
295        _sender: ActorPath,
296        msg: AuthMessage,
297        ctx: &mut ave_actors::ActorContext<Self>,
298    ) -> Result<AuthResponse, ActorError> {
299        match msg {
300            AuthMessage::GetAuth { subject_id } => {
301                if let Some(witnesses) = self.auth.get(&subject_id) {
302                    debug!(
303                        msg_type = "GetAuth",
304                        subject_id = %subject_id,
305                        "Retrieved auth witnesses"
306                    );
307
308                    return Ok(AuthResponse::Witnesses(witnesses.clone()));
309                } else {
310                    debug!(
311                        msg_type = "GetAuth",
312                        subject_id = %subject_id,
313                        "Subject is not authorized"
314                    );
315                    return Err(ActorError::Functional {
316                        description: "The subject has not been authorized"
317                            .to_owned(),
318                    });
319                }
320            }
321            AuthMessage::DeleteAuth { subject_id } => {
322                self.on_event(
323                    AuthEvent::DeleteAuth {
324                        subject_id: subject_id.clone(),
325                    },
326                    ctx,
327                )
328                .await;
329
330                debug!(
331                    msg_type = "DeleteAuth",
332                    subject_id = %subject_id,
333                    "Auth deleted successfully"
334                );
335            }
336            AuthMessage::NewAuth {
337                subject_id,
338                witness,
339            } => {
340                if !subject_id.is_empty() {
341                    self.on_event(
342                        AuthEvent::NewAuth {
343                            subject_id: subject_id.clone(),
344                            witness,
345                        },
346                        ctx,
347                    )
348                    .await;
349
350                    debug!(
351                        msg_type = "NewAuth",
352                        subject_id = %subject_id,
353                        "New auth created successfully"
354                    );
355                } else {
356                    warn!(
357                        msg_type = "NewAuth",
358                        witness = ?witness,
359                        "Ignoring auth creation with empty subject_id"
360                    );
361                }
362            }
363            AuthMessage::GetAuths => {
364                let subjects: Vec<DigestIdentifier> =
365                    self.auth.keys().cloned().collect();
366                debug!(
367                    msg_type = "GetAuths",
368                    count = subjects.len(),
369                    "Retrieved all authorized subjects"
370                );
371                return Ok(AuthResponse::Auths { subjects });
372            }
373            AuthMessage::Update {
374                subject_id,
375                objective,
376                strict,
377            } => {
378                let Some(network) = self.network.clone() else {
379                    error!(
380                        msg_type = "Update",
381                        subject_id = %subject_id,
382                        "Network is none"
383                    );
384                    return Err(ActorError::FunctionalCritical {
385                        description: "network is none".to_string(),
386                    });
387                };
388
389                let (witnesses, actual_sn, subject_kind_hint) = {
390                    let auth_witnesses =
391                        self.auth.get(&subject_id).cloned().unwrap_or_default();
392                    let (mut witnesses, actual_sn, subject_kind_hint) =
393                        if strict {
394                            let (actual_sn, subject_kind_hint) =
395                                Self::build_update_state(ctx, &subject_id)
396                                    .await?;
397                            (auth_witnesses, actual_sn, subject_kind_hint)
398                        } else {
399                            let (
400                                mut governance_witnesses,
401                                actual_sn,
402                                subject_kind_hint,
403                            ) = Self::build_update_data(ctx, &subject_id)
404                                .await?;
405
406                            if let Some(witness) = objective {
407                                governance_witnesses.insert(witness);
408                            }
409
410                            (
411                                governance_witnesses
412                                    .union(&auth_witnesses)
413                                    .cloned()
414                                    .collect::<HashSet<PublicKey>>(),
415                                actual_sn,
416                                subject_kind_hint,
417                            )
418                        };
419                    witnesses.remove(&self.our_key);
420
421                    (witnesses, actual_sn, subject_kind_hint)
422                };
423
424                if witnesses.is_empty() {
425                    warn!(
426                        msg_type = "Update",
427                        subject_id = %subject_id,
428                        "Subject has no witnesses to ask for update"
429                    );
430                    return Err(ActorError::Functional {
431                        description: "The subject has no witnesses to try to ask for an update".to_owned(),
432                    });
433                } else {
434                    let data = UpdateNew {
435                        network,
436                        subject_id: subject_id.clone(),
437                        our_sn: actual_sn,
438                        witnesses,
439                        update_type: UpdateType::Auth,
440                        subject_kind_hint,
441                        round_retry_interval_secs: self
442                            .round_retry_interval_secs,
443                        max_round_retries: self.max_round_retries,
444                        witness_retry_count: self.witness_retry_count,
445                        witness_retry_interval_secs: self
446                            .witness_retry_interval_secs,
447                    };
448
449                    let updater = Update::new(data);
450                    if let Ok(child) =
451                        ctx.create_child(&subject_id.to_string(), updater).await
452                    {
453                        if let Err(e) = child.tell(UpdateMessage::Run).await {
454                            error!(
455                                msg_type = "Update",
456                                subject_id = %subject_id,
457                                error = %e,
458                                "Failed to send Run message to update actor"
459                            );
460                            return Err(emit_fail(ctx, e).await);
461                        }
462
463                        debug!(
464                            msg_type = "Update",
465                            subject_id = %subject_id,
466                            "Update process initiated with multiple witnesses"
467                        );
468                    } else {
469                        info!(
470                            msg_type = "Update",
471                            subject_id = %subject_id,
472                            "An update is already in progress."
473                        );
474                    };
475                }
476            }
477        };
478
479        Ok(AuthResponse::None)
480    }
481
482    async fn on_event(
483        &mut self,
484        event: AuthEvent,
485        ctx: &mut ActorContext<Self>,
486    ) {
487        if let Err(e) = self.persist(&event, ctx).await {
488            error!(
489                event = ?event,
490                error = %e,
491                "Failed to persist auth event"
492            );
493            emit_fail(ctx, e).await;
494        }
495    }
496
497    async fn on_child_fault(
498        &mut self,
499        error: ActorError,
500        ctx: &mut ActorContext<Self>,
501    ) -> ChildAction {
502        error!(
503            error = %error,
504            "Child actor fault in auth"
505        );
506        emit_fail(ctx, error).await;
507        ChildAction::Stop
508    }
509}
510
511#[async_trait]
512impl PersistentActor for Auth {
513    type Persistence = LightPersistence;
514    type InitParams = AuthInitParams;
515
516    fn update(&mut self, state: Self) {
517        self.auth = state.auth;
518    }
519
520    fn create_initial(params: Self::InitParams) -> Self {
521        Self {
522            network: Some(params.network),
523            auth: HashMap::new(),
524            our_key: params.our_key,
525            round_retry_interval_secs: params.round_retry_interval_secs,
526            max_round_retries: params.max_round_retries,
527            witness_retry_count: params.witness_retry_count,
528            witness_retry_interval_secs: params.witness_retry_interval_secs,
529        }
530    }
531
532    /// Change node state.
533    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
534        match event {
535            AuthEvent::NewAuth {
536                subject_id,
537                witness,
538            } => {
539                let witnesses = match witness {
540                    AuthWitness::One(public_key) => {
541                        HashSet::from([public_key.clone()])
542                    }
543                    AuthWitness::Many(items) => items.iter().cloned().collect(),
544                    AuthWitness::None => HashSet::default(),
545                };
546
547                self.auth.insert(subject_id.clone(), witnesses);
548                debug!(
549                    event_type = "NewAuth",
550                    subject_id = %subject_id,
551                    "Applied new auth"
552                );
553            }
554            AuthEvent::DeleteAuth { subject_id } => {
555                self.auth.remove(subject_id);
556                debug!(
557                    event_type = "DeleteAuth",
558                    subject_id = %subject_id,
559                    "Applied auth deletion"
560                );
561            }
562        };
563
564        Ok(())
565    }
566}
567
568#[async_trait]
569impl Storable for Auth {}