Skip to main content

ave_core/auth/
mod.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use network::ComunicateInfo;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::collections::HashSet;
14use std::sync::Arc;
15use tracing::{Span, debug, error, info, info_span, warn};
16
17use crate::helpers::network::service::NetworkSender;
18use crate::model::common::node::get_subject_data;
19use crate::model::common::subject::{
20    get_gov, get_gov_sn, get_tracker_sn_creator,
21};
22use crate::node::SubjectData;
23use crate::update::UpdateType;
24use crate::{
25    ActorMessage, NetworkMessage,
26    db::Storable,
27    governance::model::WitnessesData,
28    model::common::emit_fail,
29    update::{Update, UpdateMessage, UpdateNew},
30};
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct Auth {
34    #[serde(skip)]
35    network: Option<Arc<NetworkSender>>,
36
37    #[serde(skip)]
38    our_key: Arc<PublicKey>,
39
40    auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
41}
42
43#[derive(
44    Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
45)]
46pub enum AuthWitness {
47    One(PublicKey),
48    Many(Vec<PublicKey>),
49    None,
50}
51
52impl BorshSerialize for Auth {
53    fn serialize<W: std::io::Write>(
54        &self,
55        writer: &mut W,
56    ) -> std::io::Result<()> {
57        // Serialize only the fields we want to persist, skipping 'owner'
58        BorshSerialize::serialize(&self.auth, writer)?;
59
60        Ok(())
61    }
62}
63
64impl BorshDeserialize for Auth {
65    fn deserialize_reader<R: std::io::Read>(
66        reader: &mut R,
67    ) -> std::io::Result<Self> {
68        // Deserialize the persisted fields
69        let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
70        let network = None;
71        let our_key = Arc::new(PublicKey::default());
72
73        Ok(Self {
74            network,
75            auth,
76            our_key,
77        })
78    }
79}
80
81impl Auth {
82    async fn build_update_data(
83        ctx: &mut ActorContext<Self>,
84        subject_id: &DigestIdentifier,
85    ) -> Result<(HashSet<PublicKey>, Option<u64>), ActorError> {
86        let data = get_subject_data(ctx, subject_id).await?;
87
88        let (witnesses, actual_sn) = if let Some(data) = &data {
89            match data {
90                SubjectData::Tracker {
91                    governance_id,
92                    schema_id,
93                    namespace,
94                    ..
95                } => {
96                    if let Some((creator, sn)) =
97                        get_tracker_sn_creator(ctx, governance_id, subject_id)
98                            .await?
99                    {
100                        let gov = get_gov(ctx, governance_id).await?;
101                        let witnesses = gov
102                            .get_witnesses(WitnessesData::Schema {
103                                creator,
104                                schema_id: schema_id.clone(),
105                                namespace: Namespace::from(
106                                    namespace.to_owned(),
107                                ),
108                            })
109                            .map_err(|e| {
110                                error!(
111                                    subject_id = %subject_id,
112                                    governance_id = %governance_id,
113                                    error = %e,
114                                    "Failed to get witnesses for tracker schema"
115                                );
116                                ActorError::Functional {
117                                    description: e.to_string(),
118                                }
119                            })?;
120
121                        (witnesses, Some(sn))
122                    } else {
123                        (HashSet::default(), None)
124                    }
125                }
126                SubjectData::Governance { .. } => {
127                    let gov = get_gov(ctx, subject_id).await?;
128                    let witnesses =
129                        gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
130                            warn!(
131                                subject_id = %subject_id,
132                                error = %e,
133                                "Failed to get witnesses for governance"
134                            );
135                            ActorError::Functional {
136                                description: e.to_string(),
137                            }
138                        })?;
139
140                    let sn = get_gov_sn(ctx, subject_id).await?;
141
142                    (witnesses, Some(sn))
143                }
144            }
145        } else {
146            (HashSet::default(), None)
147        };
148
149        Ok((witnesses, actual_sn))
150    }
151}
152
153#[derive(Debug, Clone)]
154pub enum AuthMessage {
155    NewAuth {
156        subject_id: DigestIdentifier,
157        witness: AuthWitness,
158    },
159    GetAuths,
160    GetAuth {
161        subject_id: DigestIdentifier,
162    },
163    DeleteAuth {
164        subject_id: DigestIdentifier,
165    },
166    Update {
167        subject_id: DigestIdentifier,
168        objective: Option<PublicKey>,
169    },
170}
171
172impl Message for AuthMessage {}
173
174#[derive(Debug, Clone)]
175pub enum AuthResponse {
176    Auths { subjects: Vec<DigestIdentifier> },
177    Witnesses(HashSet<PublicKey>),
178    None,
179}
180
181impl Response for AuthResponse {}
182
183#[derive(
184    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
185)]
186pub enum AuthEvent {
187    NewAuth {
188        subject_id: DigestIdentifier,
189        witness: AuthWitness,
190    },
191    DeleteAuth {
192        subject_id: DigestIdentifier,
193    },
194}
195
196impl Event for AuthEvent {}
197
198#[async_trait]
199impl Actor for Auth {
200    type Event = AuthEvent;
201    type Message = AuthMessage;
202    type Response = AuthResponse;
203
204    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
205        parent_span.map_or_else(
206            || info_span!("Auth"),
207            |parent_span| info_span!(parent: parent_span, "Auth"),
208        )
209    }
210
211    async fn pre_start(
212        &mut self,
213        ctx: &mut ActorContext<Self>,
214    ) -> Result<(), ActorError> {
215        if let Err(e) = self.init_store("auth", None, false, ctx).await {
216            error!(
217                error = %e,
218                "Failed to initialize auth store"
219            );
220            return Err(e);
221        }
222        Ok(())
223    }
224}
225
226#[async_trait]
227impl Handler<Self> for Auth {
228    async fn handle_message(
229        &mut self,
230        _sender: ActorPath,
231        msg: AuthMessage,
232        ctx: &mut ave_actors::ActorContext<Self>,
233    ) -> Result<AuthResponse, ActorError> {
234        match msg {
235            AuthMessage::GetAuth { subject_id } => {
236                if let Some(witnesses) = self.auth.get(&subject_id) {
237                    debug!(
238                        msg_type = "GetAuth",
239                        subject_id = %subject_id,
240                        "Retrieved auth witnesses"
241                    );
242
243                    return Ok(AuthResponse::Witnesses(witnesses.clone()));
244                } else {
245                    return Err(ActorError::Functional {
246                        description: "The subject has not been authorized"
247                            .to_owned(),
248                    });
249                }
250            }
251            AuthMessage::DeleteAuth { subject_id } => {
252                self.on_event(
253                    AuthEvent::DeleteAuth {
254                        subject_id: subject_id.clone(),
255                    },
256                    ctx,
257                )
258                .await;
259
260                debug!(
261                    msg_type = "DeleteAuth",
262                    subject_id = %subject_id,
263                    "Auth deleted successfully"
264                );
265            }
266            AuthMessage::NewAuth {
267                subject_id,
268                witness,
269            } => {
270                if !subject_id.is_empty() {
271                    self.on_event(
272                        AuthEvent::NewAuth {
273                            subject_id: subject_id.clone(),
274                            witness,
275                        },
276                        ctx,
277                    )
278                    .await;
279
280                    debug!(
281                        msg_type = "NewAuth",
282                        subject_id = %subject_id,
283                        "New auth created successfully"
284                    );
285                }
286            }
287            AuthMessage::GetAuths => {
288                let subjects: Vec<DigestIdentifier> =
289                    self.auth.keys().cloned().collect();
290                debug!(
291                    msg_type = "GetAuths",
292                    count = subjects.len(),
293                    "Retrieved all authorized subjects"
294                );
295                return Ok(AuthResponse::Auths { subjects });
296            }
297            AuthMessage::Update {
298                subject_id,
299                objective,
300            } => {
301                let Some(network) = self.network.clone() else {
302                    error!(
303                        msg_type = "Update",
304                        subject_id = %subject_id,
305                        "Network is none"
306                    );
307                    return Err(ActorError::FunctionalCritical {
308                        description: "network is none".to_string(),
309                    });
310                };
311
312                let (witnesses, actual_sn) = {
313                    let (mut witnesses, actual_sn) =
314                        Self::build_update_data(ctx, &subject_id).await?;
315
316                    if let Some(witness) = objective {
317                        witnesses.insert(witness);
318                    }
319
320                    let auth_witnesses =
321                        self.auth.get(&subject_id).cloned().unwrap_or_default();
322
323                    let mut witnesses = witnesses
324                        .union(&auth_witnesses)
325                        .cloned()
326                        .collect::<HashSet<PublicKey>>();
327                    witnesses.remove(&self.our_key);
328
329                    (witnesses, actual_sn)
330                };
331
332                if witnesses.is_empty() {
333                    warn!(
334                        msg_type = "Update",
335                        subject_id = %subject_id,
336                        "Subject has no witnesses to ask for update"
337                    );
338                    return Err(ActorError::Functional {
339                        description: "The subject has no witnesses to try to ask for an update".to_owned(),
340                    });
341                } else if witnesses.len() == 1 {
342                    let objetive = witnesses.iter().next().expect("len is 1");
343                    let info = ComunicateInfo {
344                        receiver: objetive.clone(),
345                        request_id: String::default(),
346                        version: 0,
347                        receiver_actor: format!(
348                            "/user/node/distributor_{}",
349                            subject_id
350                        ),
351                    };
352
353                    if let Err(e) = network
354                        .send_command(network::CommandHelper::SendMessage {
355                            message: NetworkMessage {
356                                info,
357                                message: ActorMessage::DistributionLedgerReq {
358                                    actual_sn,
359                                    subject_id: subject_id.clone(),
360                                },
361                            },
362                        })
363                        .await
364                    {
365                        error!(
366                            msg_type = "Update",
367                            subject_id = %subject_id,
368                            error = %e,
369                            "Cannot send response to network"
370                        );
371                        return Err(emit_fail(ctx, e).await);
372                    };
373
374                    debug!(
375                        msg_type = "Update",
376                        subject_id = %subject_id,
377                        "Update message sent to single witness"
378                    );
379                } else {
380                    let data = UpdateNew {
381                        network,
382                        subject_id: subject_id.clone(),
383                        our_sn: actual_sn,
384                        witnesses,
385                        update_type: UpdateType::Auth,
386                    };
387
388                    let updater = Update::new(data);
389                    if let Ok(child) =
390                        ctx.create_child(&subject_id.to_string(), updater).await
391                    {
392                        if let Err(e) = child.tell(UpdateMessage::Run).await {
393                            error!(
394                                msg_type = "Update",
395                                subject_id = %subject_id,
396                                error = %e,
397                                "Failed to send Run message to update actor"
398                            );
399                            return Err(emit_fail(ctx, e).await);
400                        }
401
402                        debug!(
403                            msg_type = "Update",
404                            subject_id = %subject_id,
405                            "Update process initiated with multiple witnesses"
406                        );
407                    } else {
408                        info!(
409                            msg_type = "Update",
410                            subject_id = %subject_id,
411                            "An update is already in progress."
412                        );
413                    };
414                }
415            }
416        };
417
418        Ok(AuthResponse::None)
419    }
420
421    async fn on_event(
422        &mut self,
423        event: AuthEvent,
424        ctx: &mut ActorContext<Self>,
425    ) {
426        if let Err(e) = self.persist(&event, ctx).await {
427            error!(
428                event = ?event,
429                error = %e,
430                "Failed to persist auth event"
431            );
432            emit_fail(ctx, e).await;
433        }
434    }
435
436    async fn on_child_fault(
437        &mut self,
438        error: ActorError,
439        ctx: &mut ActorContext<Self>,
440    ) -> ChildAction {
441        error!(
442            error = %error,
443            "Child actor fault in auth"
444        );
445        emit_fail(ctx, error).await;
446        ChildAction::Stop
447    }
448}
449
450#[async_trait]
451impl PersistentActor for Auth {
452    type Persistence = LightPersistence;
453    type InitParams = (Arc<NetworkSender>, Arc<PublicKey>);
454
455    fn update(&mut self, state: Self) {
456        self.auth = state.auth;
457    }
458
459    fn create_initial(params: Self::InitParams) -> Self {
460        Self {
461            network: Some(params.0),
462            auth: HashMap::new(),
463            our_key: params.1,
464        }
465    }
466
467    /// Change node state.
468    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
469        match event {
470            AuthEvent::NewAuth {
471                subject_id,
472                witness,
473            } => {
474                let witnesses = match witness {
475                    AuthWitness::One(public_key) => {
476                        HashSet::from([public_key.clone()])
477                    }
478                    AuthWitness::Many(items) => items.iter().cloned().collect(),
479                    AuthWitness::None => HashSet::default(),
480                };
481
482                self.auth.insert(subject_id.clone(), witnesses);
483                debug!(
484                    event_type = "NewAuth",
485                    subject_id = %subject_id,
486                    "Applied new auth"
487                );
488            }
489            AuthEvent::DeleteAuth { subject_id } => {
490                self.auth.remove(subject_id);
491                debug!(
492                    event_type = "DeleteAuth",
493                    subject_id = %subject_id,
494                    "Applied auth deletion"
495                );
496            }
497        };
498
499        Ok(())
500    }
501}
502
503#[async_trait]
504impl Storable for Auth {}