Skip to main content

ave_core/auth/
mod.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use network::ComunicateInfo;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::collections::HashSet;
14use std::sync::Arc;
15use tracing::{Span, debug, error, info, info_span, warn};
16
17use crate::helpers::network::service::NetworkSender;
18use crate::model::common::node::get_subject_data;
19use crate::model::common::subject::{
20    get_gov, get_gov_sn, get_tracker_sn_creator,
21};
22use crate::node::SubjectData;
23use crate::update::UpdateType;
24use crate::{
25    ActorMessage, NetworkMessage,
26    db::Storable,
27    governance::model::WitnessesData,
28    model::common::emit_fail,
29    update::{Update, UpdateMessage, UpdateNew},
30};
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct Auth {
34    #[serde(skip)]
35    network: Option<Arc<NetworkSender>>,
36
37    #[serde(skip)]
38    our_key: Arc<PublicKey>,
39
40    auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
41}
42
43#[derive(
44    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
45)]
46pub enum AuthWitness {
47    One(PublicKey),
48    Many(Vec<PublicKey>),
49    None,
50}
51
52impl BorshSerialize for Auth {
53    fn serialize<W: std::io::Write>(
54        &self,
55        writer: &mut W,
56    ) -> std::io::Result<()> {
57        // Serialize only the fields we want to persist, skipping 'owner'
58        BorshSerialize::serialize(&self.auth, writer)?;
59
60        Ok(())
61    }
62}
63
64impl BorshDeserialize for Auth {
65    fn deserialize_reader<R: std::io::Read>(
66        reader: &mut R,
67    ) -> std::io::Result<Self> {
68        // Deserialize the persisted fields
69        let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
70        let network = None;
71        let our_key = Arc::new(PublicKey::default());
72
73        Ok(Self {
74            network,
75            auth,
76            our_key,
77        })
78    }
79}
80
81impl Auth {
82    async fn build_update_data(
83        ctx: &mut ActorContext<Self>,
84        subject_id: &DigestIdentifier,
85    ) -> Result<(HashSet<PublicKey>, Option<u64>), ActorError> {
86        let data = get_subject_data(ctx, subject_id).await?;
87
88        let (witnesses, actual_sn) = if let Some(data) = &data {
89            match data {
90                SubjectData::Tracker {
91                    governance_id,
92                    schema_id,
93                    namespace,
94                    ..
95                } => {
96                    if let Some((creator, sn)) =
97                        get_tracker_sn_creator(ctx, governance_id, subject_id)
98                            .await?
99                    {
100                        let gov = get_gov(ctx, governance_id).await?;
101                        let witnesses = gov
102                            .get_witnesses(WitnessesData::Schema {
103                                creator,
104                                schema_id: schema_id.clone(),
105                                namespace: Namespace::from(
106                                    namespace.to_owned(),
107                                ),
108                            })
109                            .map_err(|e| {
110                                error!(
111                                    subject_id = %subject_id,
112                                    governance_id = %governance_id,
113                                    error = %e,
114                                    "Failed to get witnesses for tracker schema"
115                                );
116                                ActorError::Functional {
117                                    description: e.to_string(),
118                                }
119                            })?;
120
121                        (witnesses, Some(sn))
122                    } else {
123                        (HashSet::default(), None)
124                    }
125                }
126                SubjectData::Governance { .. } => {
127                    let gov = get_gov(ctx, subject_id).await?;
128                    let witnesses =
129                        gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
130                            warn!(
131                                subject_id = %subject_id,
132                                error = %e,
133                                "Failed to get witnesses for governance"
134                            );
135                            ActorError::Functional {
136                                description: e.to_string(),
137                            }
138                        })?;
139
140                    let sn = get_gov_sn(ctx, subject_id).await?;
141
142                    (witnesses, Some(sn))
143                }
144            }
145        } else {
146            (HashSet::default(), None)
147        };
148
149        Ok((witnesses, actual_sn))
150    }
151}
152
153#[derive(Debug, Clone)]
154pub enum AuthMessage {
155    NewAuth {
156        subject_id: DigestIdentifier,
157        witness: AuthWitness,
158    },
159    GetAuths,
160    GetAuth {
161        subject_id: DigestIdentifier,
162    },
163    DeleteAuth {
164        subject_id: DigestIdentifier,
165    },
166    Update {
167        subject_id: DigestIdentifier,
168        objective: Option<PublicKey>,
169    },
170}
171
172impl Message for AuthMessage {}
173
174#[derive(Debug, Clone)]
175pub enum AuthResponse {
176    Auths { subjects: Vec<DigestIdentifier> },
177    Witnesses(HashSet<PublicKey>),
178    None,
179}
180
181impl Response for AuthResponse {}
182
183#[derive(
184    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
185)]
186pub enum AuthEvent {
187    NewAuth {
188        subject_id: DigestIdentifier,
189        witness: AuthWitness,
190    },
191    DeleteAuth {
192        subject_id: DigestIdentifier,
193    },
194}
195
196impl Event for AuthEvent {}
197
198#[async_trait]
199impl Actor for Auth {
200    type Event = AuthEvent;
201    type Message = AuthMessage;
202    type Response = AuthResponse;
203
204    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
205        parent_span.map_or_else(
206            || info_span!("Auth"),
207            |parent_span| info_span!(parent: parent_span, "Auth"),
208        )
209    }
210
211    async fn pre_start(
212        &mut self,
213        ctx: &mut ActorContext<Self>,
214    ) -> Result<(), ActorError> {
215        if let Err(e) = self.init_store("auth", None, false, ctx).await {
216            error!(
217                error = %e,
218                "Failed to initialize auth store"
219            );
220            return Err(e);
221        }
222        Ok(())
223    }
224}
225
226#[async_trait]
227impl Handler<Self> for Auth {
228    async fn handle_message(
229        &mut self,
230        _sender: ActorPath,
231        msg: AuthMessage,
232        ctx: &mut ave_actors::ActorContext<Self>,
233    ) -> Result<AuthResponse, ActorError> {
234        match msg {
235            AuthMessage::GetAuth { subject_id } => {
236                if let Some(witnesses) = self.auth.get(&subject_id) {
237                    debug!(
238                        msg_type = "GetAuth",
239                        subject_id = %subject_id,
240                        "Retrieved auth witnesses"
241                    );
242
243                    return Ok(AuthResponse::Witnesses(witnesses.clone()));
244                } else {
245                    warn!(
246                        msg_type = "GetAuth",
247                        subject_id = %subject_id,
248                        "Subject has not been authorized"
249                    );
250                    return Err(ActorError::Functional {
251                        description: "The subject has not been authorized"
252                            .to_owned(),
253                    });
254                }
255            }
256            AuthMessage::DeleteAuth { subject_id } => {
257                self.on_event(
258                    AuthEvent::DeleteAuth {
259                        subject_id: subject_id.clone(),
260                    },
261                    ctx,
262                )
263                .await;
264
265                debug!(
266                    msg_type = "DeleteAuth",
267                    subject_id = %subject_id,
268                    "Auth deleted successfully"
269                );
270            }
271            AuthMessage::NewAuth {
272                subject_id,
273                witness,
274            } => {
275                if !subject_id.is_empty() {
276                    self.on_event(
277                        AuthEvent::NewAuth {
278                            subject_id: subject_id.clone(),
279                            witness,
280                        },
281                        ctx,
282                    )
283                    .await;
284
285                    debug!(
286                        msg_type = "NewAuth",
287                        subject_id = %subject_id,
288                        "New auth created successfully"
289                    );
290                }
291            }
292            AuthMessage::GetAuths => {
293                let subjects: Vec<DigestIdentifier> =
294                    self.auth.keys().cloned().collect();
295                debug!(
296                    msg_type = "GetAuths",
297                    count = subjects.len(),
298                    "Retrieved all authorized subjects"
299                );
300                return Ok(AuthResponse::Auths { subjects });
301            }
302            AuthMessage::Update {
303                subject_id,
304                objective,
305            } => {
306                let Some(network) = self.network.clone() else {
307                    error!(
308                        msg_type = "Update",
309                        subject_id = %subject_id,
310                        "Network is none"
311                    );
312                    return Err(ActorError::FunctionalCritical {
313                        description: "network is none".to_string(),
314                    });
315                };
316
317                let (witnesses, actual_sn) = {
318                    let (mut witnesses, actual_sn) =
319                        Self::build_update_data(ctx, &subject_id).await?;
320
321                    if let Some(witness) = objective {
322                        witnesses.insert(witness);
323                    }
324
325                    let auth_witnesses =
326                        self.auth.get(&subject_id).cloned().unwrap_or_default();
327
328                    let mut witnesses = witnesses
329                        .union(&auth_witnesses)
330                        .cloned()
331                        .collect::<HashSet<PublicKey>>();
332                    witnesses.remove(&self.our_key);
333
334                    (witnesses, actual_sn)
335                };
336
337                if witnesses.is_empty() {
338                    warn!(
339                        msg_type = "Update",
340                        subject_id = %subject_id,
341                        "Subject has no witnesses to ask for update"
342                    );
343                    return Err(ActorError::Functional {
344                        description: "The subject has no witnesses to try to ask for an update".to_owned(),
345                    });
346                } else if witnesses.len() == 1 {
347                    let objetive = witnesses.iter().next().expect("len is 1");
348                    let info = ComunicateInfo {
349                        receiver: objetive.clone(),
350                        request_id: String::default(),
351                        version: 0,
352                        receiver_actor: format!(
353                            "/user/node/distributor_{}",
354                            subject_id
355                        ),
356                    };
357
358                    if let Err(e) = network
359                        .send_command(network::CommandHelper::SendMessage {
360                            message: NetworkMessage {
361                                info,
362                                message: ActorMessage::DistributionLedgerReq {
363                                    actual_sn,
364                                    subject_id: subject_id.clone(),
365                                },
366                            },
367                        })
368                        .await
369                    {
370                        error!(
371                            msg_type = "Update",
372                            subject_id = %subject_id,
373                            error = %e,
374                            "Cannot send response to network"
375                        );
376                        return Err(emit_fail(ctx, e).await);
377                    };
378
379                    debug!(
380                        msg_type = "Update",
381                        subject_id = %subject_id,
382                        "Update message sent to single witness"
383                    );
384                } else {
385                    let data = UpdateNew {
386                        network,
387                        subject_id: subject_id.clone(),
388                        our_sn: actual_sn,
389                        witnesses,
390                        update_type: UpdateType::Auth,
391                    };
392
393                    let updater = Update::new(data);
394                    if let Ok(child) =
395                        ctx.create_child(&subject_id.to_string(), updater).await
396                    {
397                        if let Err(e) = child.tell(UpdateMessage::Run).await {
398                            error!(
399                                msg_type = "Update",
400                                subject_id = %subject_id,
401                                error = %e,
402                                "Failed to send Run message to update actor"
403                            );
404                            return Err(emit_fail(ctx, e).await);
405                        }
406
407                        debug!(
408                            msg_type = "Update",
409                            subject_id = %subject_id,
410                            "Update process initiated with multiple witnesses"
411                        );
412                    } else {
413                        info!(
414                            msg_type = "Update",
415                            subject_id = %subject_id,
416                            "An update is already in progress."
417                        );
418                    };
419                }
420            }
421        };
422
423        Ok(AuthResponse::None)
424    }
425
426    async fn on_event(
427        &mut self,
428        event: AuthEvent,
429        ctx: &mut ActorContext<Self>,
430    ) {
431        if let Err(e) = self.persist(&event, ctx).await {
432            error!(
433                event = ?event,
434                error = %e,
435                "Failed to persist auth event"
436            );
437            emit_fail(ctx, e).await;
438        }
439    }
440
441    async fn on_child_fault(
442        &mut self,
443        error: ActorError,
444        ctx: &mut ActorContext<Self>,
445    ) -> ChildAction {
446        error!(
447            error = %error,
448            "Child actor fault in auth"
449        );
450        emit_fail(ctx, error).await;
451        ChildAction::Stop
452    }
453}
454
455#[async_trait]
456impl PersistentActor for Auth {
457    type Persistence = LightPersistence;
458    type InitParams = (Arc<NetworkSender>, Arc<PublicKey>);
459
460    fn update(&mut self, state: Self) {
461        self.auth = state.auth;
462    }
463
464    fn create_initial(params: Self::InitParams) -> Self {
465        Self {
466            network: Some(params.0),
467            auth: HashMap::new(),
468            our_key: params.1,
469        }
470    }
471
472    /// Change node state.
473    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
474        match event {
475            AuthEvent::NewAuth {
476                subject_id,
477                witness,
478            } => {
479                let witnesses = match witness {
480                    AuthWitness::One(public_key) => {
481                        HashSet::from([public_key.clone()])
482                    }
483                    AuthWitness::Many(items) => items.iter().cloned().collect(),
484                    AuthWitness::None => HashSet::default(),
485                };
486
487                self.auth.insert(subject_id.clone(), witnesses);
488                debug!(
489                    event_type = "NewAuth",
490                    subject_id = %subject_id,
491                    "Applied new auth"
492                );
493            }
494            AuthEvent::DeleteAuth { subject_id } => {
495                self.auth.remove(subject_id);
496                debug!(
497                    event_type = "DeleteAuth",
498                    subject_id = %subject_id,
499                    "Applied auth deletion"
500                );
501            }
502        };
503
504        Ok(())
505    }
506}
507
508#[async_trait]
509impl Storable for Auth {}