Skip to main content

ave_core/auth/
mod.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::collections::HashSet;
13use std::sync::Arc;
14use tracing::{Span, debug, error, info, info_span, warn};
15
16use crate::helpers::network::service::NetworkSender;
17use crate::model::common::node::get_subject_data;
18use crate::model::common::subject::{
19    get_gov, get_gov_sn, get_tracker_sn_owner,
20};
21use crate::node::SubjectData;
22use crate::update::UpdateType;
23use crate::{
24    db::Storable,
25    governance::model::WitnessesData,
26    model::common::emit_fail,
27    update::{Update, UpdateMessage, UpdateNew, UpdateSubjectKind},
28};
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct Auth {
32    #[serde(skip)]
33    network: Option<Arc<NetworkSender>>,
34
35    #[serde(skip)]
36    our_key: Arc<PublicKey>,
37
38    #[serde(skip)]
39    round_retry_interval_secs: u64,
40
41    #[serde(skip)]
42    max_round_retries: usize,
43
44    #[serde(skip)]
45    witness_retry_count: usize,
46
47    #[serde(skip)]
48    witness_retry_interval_secs: u64,
49
50    auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
51}
52
53#[derive(Clone, Debug)]
54pub struct AuthInitParams {
55    pub network: Arc<NetworkSender>,
56    pub our_key: Arc<PublicKey>,
57    pub round_retry_interval_secs: u64,
58    pub max_round_retries: usize,
59    pub witness_retry_count: usize,
60    pub witness_retry_interval_secs: u64,
61}
62
63#[derive(
64    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
65)]
66pub enum AuthWitness {
67    One(PublicKey),
68    Many(Vec<PublicKey>),
69    None,
70}
71
72impl BorshSerialize for Auth {
73    fn serialize<W: std::io::Write>(
74        &self,
75        writer: &mut W,
76    ) -> std::io::Result<()> {
77        // Serialize only the fields we want to persist, skipping 'owner'
78        BorshSerialize::serialize(&self.auth, writer)?;
79
80        Ok(())
81    }
82}
83
84impl BorshDeserialize for Auth {
85    fn deserialize_reader<R: std::io::Read>(
86        reader: &mut R,
87    ) -> std::io::Result<Self> {
88        // Deserialize the persisted fields
89        let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
90        let network = None;
91        let our_key = Arc::new(PublicKey::default());
92
93        Ok(Self {
94            network,
95            auth,
96            our_key,
97            round_retry_interval_secs: 10,
98            max_round_retries: 3,
99            witness_retry_count: 3,
100            witness_retry_interval_secs: 10,
101        })
102    }
103}
104
105impl Auth {
106    async fn build_update_state(
107        ctx: &mut ActorContext<Self>,
108        subject_id: &DigestIdentifier,
109    ) -> Result<(Option<u64>, Option<UpdateSubjectKind>), ActorError> {
110        let data = get_subject_data(ctx, subject_id).await?;
111
112        match data {
113            Some(SubjectData::Tracker { governance_id, .. }) => {
114                let actual_sn =
115                    get_tracker_sn_owner(ctx, &governance_id, subject_id)
116                        .await?
117                        .map(|(_, actual_sn)| actual_sn);
118
119                Ok((actual_sn, Some(UpdateSubjectKind::Tracker)))
120            }
121            Some(SubjectData::Governance { .. }) => {
122                let sn = get_gov_sn(ctx, subject_id).await?;
123                Ok((Some(sn), Some(UpdateSubjectKind::Governance)))
124            }
125            None => Ok((None, None)),
126        }
127    }
128
129    async fn build_update_data(
130        ctx: &mut ActorContext<Self>,
131        subject_id: &DigestIdentifier,
132    ) -> Result<
133        (HashSet<PublicKey>, Option<u64>, Option<UpdateSubjectKind>),
134        ActorError,
135    > {
136        let data = get_subject_data(ctx, subject_id).await?;
137
138        let (witnesses, actual_sn, subject_kind_hint) = if let Some(data) =
139            &data
140        {
141            match data {
142                SubjectData::Tracker {
143                    governance_id,
144                    schema_id,
145                    namespace,
146                    ..
147                } => {
148                    if let Some((owner, actual_sn)) =
149                        get_tracker_sn_owner(ctx, governance_id, subject_id)
150                            .await?
151                    {
152                        let gov = get_gov(ctx, governance_id).await?;
153                        let witnesses = gov
154                            .get_witnesses(WitnessesData::Schema {
155                                creator: owner,
156                                schema_id: schema_id.clone(),
157                                namespace: Namespace::from(
158                                    namespace.to_owned(),
159                                ),
160                            })
161                            .map_err(|e| {
162                                error!(
163                                    subject_id = %subject_id,
164                                    governance_id = %governance_id,
165                                    error = %e,
166                                    "Failed to get witnesses for tracker schema"
167                                );
168                                ActorError::Functional {
169                                    description: e.to_string(),
170                                }
171                            })?;
172
173                        (
174                            witnesses,
175                            Some(actual_sn),
176                            Some(UpdateSubjectKind::Tracker),
177                        )
178                    } else {
179                        (
180                            HashSet::default(),
181                            None,
182                            Some(UpdateSubjectKind::Tracker),
183                        )
184                    }
185                }
186                SubjectData::Governance { .. } => {
187                    let gov = get_gov(ctx, subject_id).await?;
188                    let witnesses =
189                        gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
190                            warn!(
191                                subject_id = %subject_id,
192                                error = %e,
193                                "Failed to get witnesses for governance"
194                            );
195                            ActorError::Functional {
196                                description: e.to_string(),
197                            }
198                        })?;
199
200                    let sn = get_gov_sn(ctx, subject_id).await?;
201
202                    (witnesses, Some(sn), Some(UpdateSubjectKind::Governance))
203                }
204            }
205        } else {
206            (HashSet::default(), None, None)
207        };
208
209        Ok((witnesses, actual_sn, subject_kind_hint))
210    }
211}
212
213#[derive(Debug, Clone)]
214pub enum AuthMessage {
215    NewAuth {
216        subject_id: DigestIdentifier,
217        witness: AuthWitness,
218    },
219    GetAuths,
220    GetAuth {
221        subject_id: DigestIdentifier,
222    },
223    DeleteAuth {
224        subject_id: DigestIdentifier,
225    },
226    Update {
227        subject_id: DigestIdentifier,
228        objective: Option<PublicKey>,
229        strict: bool,
230    },
231}
232
233impl Message for AuthMessage {}
234
235#[derive(Debug, Clone)]
236pub enum AuthResponse {
237    Auths { subjects: Vec<DigestIdentifier> },
238    Witnesses(HashSet<PublicKey>),
239    None,
240}
241
242impl Response for AuthResponse {}
243
244#[derive(
245    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
246)]
247pub enum AuthEvent {
248    NewAuth {
249        subject_id: DigestIdentifier,
250        witness: AuthWitness,
251    },
252    DeleteAuth {
253        subject_id: DigestIdentifier,
254    },
255}
256
257impl Event for AuthEvent {}
258
259#[async_trait]
260impl Actor for Auth {
261    type Event = AuthEvent;
262    type Message = AuthMessage;
263    type Response = AuthResponse;
264
265    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
266        parent_span.map_or_else(
267            || info_span!("Auth"),
268            |parent_span| info_span!(parent: parent_span, "Auth"),
269        )
270    }
271
272    async fn pre_start(
273        &mut self,
274        ctx: &mut ActorContext<Self>,
275    ) -> Result<(), ActorError> {
276        if let Err(e) = self.init_store("auth", None, false, ctx).await {
277            error!(
278                error = %e,
279                "Failed to initialize auth store"
280            );
281            return Err(e);
282        }
283
284        Ok(())
285    }
286}
287
288#[async_trait]
289impl Handler<Self> for Auth {
290    async fn handle_message(
291        &mut self,
292        _sender: ActorPath,
293        msg: AuthMessage,
294        ctx: &mut ave_actors::ActorContext<Self>,
295    ) -> Result<AuthResponse, ActorError> {
296        match msg {
297            AuthMessage::GetAuth { subject_id } => {
298                if let Some(witnesses) = self.auth.get(&subject_id) {
299                    debug!(
300                        msg_type = "GetAuth",
301                        subject_id = %subject_id,
302                        "Retrieved auth witnesses"
303                    );
304
305                    return Ok(AuthResponse::Witnesses(witnesses.clone()));
306                } else {
307                    debug!(
308                        msg_type = "GetAuth",
309                        subject_id = %subject_id,
310                        "Subject is not authorized"
311                    );
312                    return Err(ActorError::Functional {
313                        description: "The subject has not been authorized"
314                            .to_owned(),
315                    });
316                }
317            }
318            AuthMessage::DeleteAuth { subject_id } => {
319                self.on_event(
320                    AuthEvent::DeleteAuth {
321                        subject_id: subject_id.clone(),
322                    },
323                    ctx,
324                )
325                .await;
326
327                debug!(
328                    msg_type = "DeleteAuth",
329                    subject_id = %subject_id,
330                    "Auth deleted successfully"
331                );
332            }
333            AuthMessage::NewAuth {
334                subject_id,
335                witness,
336            } => {
337                if !subject_id.is_empty() {
338                    self.on_event(
339                        AuthEvent::NewAuth {
340                            subject_id: subject_id.clone(),
341                            witness,
342                        },
343                        ctx,
344                    )
345                    .await;
346
347                    debug!(
348                        msg_type = "NewAuth",
349                        subject_id = %subject_id,
350                        "New auth created successfully"
351                    );
352                } else {
353                    warn!(
354                        msg_type = "NewAuth",
355                        witness = ?witness,
356                        "Ignoring auth creation with empty subject_id"
357                    );
358                }
359            }
360            AuthMessage::GetAuths => {
361                let subjects: Vec<DigestIdentifier> =
362                    self.auth.keys().cloned().collect();
363                debug!(
364                    msg_type = "GetAuths",
365                    count = subjects.len(),
366                    "Retrieved all authorized subjects"
367                );
368                return Ok(AuthResponse::Auths { subjects });
369            }
370            AuthMessage::Update {
371                subject_id,
372                objective,
373                strict,
374            } => {
375                let Some(network) = self.network.clone() else {
376                    error!(
377                        msg_type = "Update",
378                        subject_id = %subject_id,
379                        "Network is none"
380                    );
381                    return Err(ActorError::FunctionalCritical {
382                        description: "network is none".to_string(),
383                    });
384                };
385
386                let (witnesses, actual_sn, subject_kind_hint) = {
387                    let auth_witnesses =
388                        self.auth.get(&subject_id).cloned().unwrap_or_default();
389                    let (mut witnesses, actual_sn, subject_kind_hint) =
390                        if strict {
391                            let (actual_sn, subject_kind_hint) =
392                                Self::build_update_state(ctx, &subject_id)
393                                    .await?;
394                            (auth_witnesses, actual_sn, subject_kind_hint)
395                        } else {
396                            let (
397                                mut governance_witnesses,
398                                actual_sn,
399                                subject_kind_hint,
400                            ) = Self::build_update_data(ctx, &subject_id)
401                                .await?;
402
403                            if let Some(witness) = objective {
404                                governance_witnesses.insert(witness);
405                            }
406
407                            (
408                                governance_witnesses
409                                    .union(&auth_witnesses)
410                                    .cloned()
411                                    .collect::<HashSet<PublicKey>>(),
412                                actual_sn,
413                                subject_kind_hint,
414                            )
415                        };
416                    witnesses.remove(&self.our_key);
417
418                    (witnesses, actual_sn, subject_kind_hint)
419                };
420
421                if witnesses.is_empty() {
422                    warn!(
423                        msg_type = "Update",
424                        subject_id = %subject_id,
425                        "Subject has no witnesses to ask for update"
426                    );
427                    return Err(ActorError::Functional {
428                        description: "The subject has no witnesses to try to ask for an update".to_owned(),
429                    });
430                } else {
431                    let data = UpdateNew {
432                        network,
433                        subject_id: subject_id.clone(),
434                        our_sn: actual_sn,
435                        witnesses,
436                        update_type: UpdateType::Auth,
437                        subject_kind_hint,
438                        round_retry_interval_secs: self
439                            .round_retry_interval_secs,
440                        max_round_retries: self.max_round_retries,
441                        witness_retry_count: self.witness_retry_count,
442                        witness_retry_interval_secs: self
443                            .witness_retry_interval_secs,
444                    };
445
446                    let updater = Update::new(data);
447                    if let Ok(child) =
448                        ctx.create_child(&subject_id.to_string(), updater).await
449                    {
450                        if let Err(e) = child.tell(UpdateMessage::Run).await {
451                            error!(
452                                msg_type = "Update",
453                                subject_id = %subject_id,
454                                error = %e,
455                                "Failed to send Run message to update actor"
456                            );
457                            return Err(emit_fail(ctx, e).await);
458                        }
459
460                        debug!(
461                            msg_type = "Update",
462                            subject_id = %subject_id,
463                            "Update process initiated with multiple witnesses"
464                        );
465                    } else {
466                        info!(
467                            msg_type = "Update",
468                            subject_id = %subject_id,
469                            "An update is already in progress."
470                        );
471                    };
472                }
473            }
474        };
475
476        Ok(AuthResponse::None)
477    }
478
479    async fn on_event(
480        &mut self,
481        event: AuthEvent,
482        ctx: &mut ActorContext<Self>,
483    ) {
484        if let Err(e) = self.persist(&event, ctx).await {
485            error!(
486                event = ?event,
487                error = %e,
488                "Failed to persist auth event"
489            );
490            emit_fail(ctx, e).await;
491        }
492    }
493
494    async fn on_child_fault(
495        &mut self,
496        error: ActorError,
497        ctx: &mut ActorContext<Self>,
498    ) -> ChildAction {
499        error!(
500            error = %error,
501            "Child actor fault in auth"
502        );
503        emit_fail(ctx, error).await;
504        ChildAction::Stop
505    }
506}
507
508#[async_trait]
509impl PersistentActor for Auth {
510    type Persistence = LightPersistence;
511    type InitParams = AuthInitParams;
512
513    fn update(&mut self, state: Self) {
514        self.auth = state.auth;
515    }
516
517    fn create_initial(params: Self::InitParams) -> Self {
518        Self {
519            network: Some(params.network),
520            auth: HashMap::new(),
521            our_key: params.our_key,
522            round_retry_interval_secs: params.round_retry_interval_secs,
523            max_round_retries: params.max_round_retries,
524            witness_retry_count: params.witness_retry_count,
525            witness_retry_interval_secs: params.witness_retry_interval_secs,
526        }
527    }
528
529    /// Change node state.
530    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
531        match event {
532            AuthEvent::NewAuth {
533                subject_id,
534                witness,
535            } => {
536                let witnesses = match witness {
537                    AuthWitness::One(public_key) => {
538                        HashSet::from([public_key.clone()])
539                    }
540                    AuthWitness::Many(items) => items.iter().cloned().collect(),
541                    AuthWitness::None => HashSet::default(),
542                };
543
544                self.auth.insert(subject_id.clone(), witnesses);
545                debug!(
546                    event_type = "NewAuth",
547                    subject_id = %subject_id,
548                    "Applied new auth"
549                );
550            }
551            AuthEvent::DeleteAuth { subject_id } => {
552                self.auth.remove(subject_id);
553                debug!(
554                    event_type = "DeleteAuth",
555                    subject_id = %subject_id,
556                    "Applied auth deletion"
557                );
558            }
559        };
560
561        Ok(())
562    }
563}
564
565#[async_trait]
566impl Storable for Auth {}