1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message, Response,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::identity::{DigestIdentifier, PublicKey};
9use borsh::{BorshDeserialize, BorshSerialize};
10use network::ComunicateInfo;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::collections::HashSet;
14use std::sync::Arc;
15use tracing::{Span, debug, error, info, info_span, warn};
16
17use crate::helpers::network::service::NetworkSender;
18use crate::model::common::node::get_subject_data;
19use crate::model::common::subject::{
20 get_gov, get_gov_sn, get_tracker_sn_creator,
21};
22use crate::node::SubjectData;
23use crate::update::UpdateType;
24use crate::{
25 ActorMessage, NetworkMessage,
26 db::Storable,
27 governance::model::WitnessesData,
28 model::common::emit_fail,
29 update::{Update, UpdateMessage, UpdateNew},
30};
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct Auth {
34 #[serde(skip)]
35 network: Option<Arc<NetworkSender>>,
36
37 #[serde(skip)]
38 our_key: Arc<PublicKey>,
39
40 auth: HashMap<DigestIdentifier, HashSet<PublicKey>>,
41}
42
43#[derive(
44 Clone, Debug, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
45)]
46pub enum AuthWitness {
47 One(PublicKey),
48 Many(Vec<PublicKey>),
49 None,
50}
51
52impl BorshSerialize for Auth {
53 fn serialize<W: std::io::Write>(
54 &self,
55 writer: &mut W,
56 ) -> std::io::Result<()> {
57 BorshSerialize::serialize(&self.auth, writer)?;
59
60 Ok(())
61 }
62}
63
64impl BorshDeserialize for Auth {
65 fn deserialize_reader<R: std::io::Read>(
66 reader: &mut R,
67 ) -> std::io::Result<Self> {
68 let auth = HashMap::<DigestIdentifier, HashSet<PublicKey>>::deserialize_reader(reader)?;
70 let network = None;
71 let our_key = Arc::new(PublicKey::default());
72
73 Ok(Self {
74 network,
75 auth,
76 our_key,
77 })
78 }
79}
80
81impl Auth {
82 async fn build_update_data(
83 ctx: &mut ActorContext<Self>,
84 subject_id: &DigestIdentifier,
85 ) -> Result<(HashSet<PublicKey>, Option<u64>), ActorError> {
86 let data = get_subject_data(ctx, subject_id).await?;
87
88 let (witnesses, actual_sn) = if let Some(data) = &data {
89 match data {
90 SubjectData::Tracker {
91 governance_id,
92 schema_id,
93 namespace,
94 ..
95 } => {
96 if let Some((creator, sn)) =
97 get_tracker_sn_creator(ctx, governance_id, subject_id)
98 .await?
99 {
100 let gov = get_gov(ctx, governance_id).await?;
101 let witnesses = gov
102 .get_witnesses(WitnessesData::Schema {
103 creator,
104 schema_id: schema_id.clone(),
105 namespace: Namespace::from(
106 namespace.to_owned(),
107 ),
108 })
109 .map_err(|e| {
110 error!(
111 subject_id = %subject_id,
112 governance_id = %governance_id,
113 error = %e,
114 "Failed to get witnesses for tracker schema"
115 );
116 ActorError::Functional {
117 description: e.to_string(),
118 }
119 })?;
120
121 (witnesses, Some(sn))
122 } else {
123 (HashSet::default(), None)
124 }
125 }
126 SubjectData::Governance { .. } => {
127 let gov = get_gov(ctx, subject_id).await?;
128 let witnesses =
129 gov.get_witnesses(WitnessesData::Gov).map_err(|e| {
130 warn!(
131 subject_id = %subject_id,
132 error = %e,
133 "Failed to get witnesses for governance"
134 );
135 ActorError::Functional {
136 description: e.to_string(),
137 }
138 })?;
139
140 let sn = get_gov_sn(ctx, subject_id).await?;
141
142 (witnesses, Some(sn))
143 }
144 }
145 } else {
146 (HashSet::default(), None)
147 };
148
149 Ok((witnesses, actual_sn))
150 }
151}
152
153#[derive(Debug, Clone)]
154pub enum AuthMessage {
155 NewAuth {
156 subject_id: DigestIdentifier,
157 witness: AuthWitness,
158 },
159 GetAuths,
160 GetAuth {
161 subject_id: DigestIdentifier,
162 },
163 DeleteAuth {
164 subject_id: DigestIdentifier,
165 },
166 Update {
167 subject_id: DigestIdentifier,
168 objective: Option<PublicKey>,
169 },
170}
171
172impl Message for AuthMessage {}
173
174#[derive(Debug, Clone)]
175pub enum AuthResponse {
176 Auths { subjects: Vec<DigestIdentifier> },
177 Witnesses(HashSet<PublicKey>),
178 None,
179}
180
181impl Response for AuthResponse {}
182
183#[derive(
184 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
185)]
186pub enum AuthEvent {
187 NewAuth {
188 subject_id: DigestIdentifier,
189 witness: AuthWitness,
190 },
191 DeleteAuth {
192 subject_id: DigestIdentifier,
193 },
194}
195
196impl Event for AuthEvent {}
197
198#[async_trait]
199impl Actor for Auth {
200 type Event = AuthEvent;
201 type Message = AuthMessage;
202 type Response = AuthResponse;
203
204 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
205 parent_span.map_or_else(
206 || info_span!("Auth"),
207 |parent_span| info_span!(parent: parent_span, "Auth"),
208 )
209 }
210
211 async fn pre_start(
212 &mut self,
213 ctx: &mut ActorContext<Self>,
214 ) -> Result<(), ActorError> {
215 if let Err(e) = self.init_store("auth", None, false, ctx).await {
216 error!(
217 error = %e,
218 "Failed to initialize auth store"
219 );
220 return Err(e);
221 }
222 Ok(())
223 }
224}
225
226#[async_trait]
227impl Handler<Self> for Auth {
228 async fn handle_message(
229 &mut self,
230 _sender: ActorPath,
231 msg: AuthMessage,
232 ctx: &mut ave_actors::ActorContext<Self>,
233 ) -> Result<AuthResponse, ActorError> {
234 match msg {
235 AuthMessage::GetAuth { subject_id } => {
236 if let Some(witnesses) = self.auth.get(&subject_id) {
237 debug!(
238 msg_type = "GetAuth",
239 subject_id = %subject_id,
240 "Retrieved auth witnesses"
241 );
242
243 return Ok(AuthResponse::Witnesses(witnesses.clone()));
244 } else {
245 warn!(
246 msg_type = "GetAuth",
247 subject_id = %subject_id,
248 "Subject has not been authorized"
249 );
250 return Err(ActorError::Functional {
251 description: "The subject has not been authorized"
252 .to_owned(),
253 });
254 }
255 }
256 AuthMessage::DeleteAuth { subject_id } => {
257 self.on_event(
258 AuthEvent::DeleteAuth {
259 subject_id: subject_id.clone(),
260 },
261 ctx,
262 )
263 .await;
264
265 debug!(
266 msg_type = "DeleteAuth",
267 subject_id = %subject_id,
268 "Auth deleted successfully"
269 );
270 }
271 AuthMessage::NewAuth {
272 subject_id,
273 witness,
274 } => {
275 if !subject_id.is_empty() {
276 self.on_event(
277 AuthEvent::NewAuth {
278 subject_id: subject_id.clone(),
279 witness,
280 },
281 ctx,
282 )
283 .await;
284
285 debug!(
286 msg_type = "NewAuth",
287 subject_id = %subject_id,
288 "New auth created successfully"
289 );
290 }
291 }
292 AuthMessage::GetAuths => {
293 let subjects: Vec<DigestIdentifier> =
294 self.auth.keys().cloned().collect();
295 debug!(
296 msg_type = "GetAuths",
297 count = subjects.len(),
298 "Retrieved all authorized subjects"
299 );
300 return Ok(AuthResponse::Auths { subjects });
301 }
302 AuthMessage::Update {
303 subject_id,
304 objective,
305 } => {
306 let Some(network) = self.network.clone() else {
307 error!(
308 msg_type = "Update",
309 subject_id = %subject_id,
310 "Network is none"
311 );
312 return Err(ActorError::FunctionalCritical {
313 description: "network is none".to_string(),
314 });
315 };
316
317 let (witnesses, actual_sn) = {
318 let (mut witnesses, actual_sn) =
319 Self::build_update_data(ctx, &subject_id).await?;
320
321 if let Some(witness) = objective {
322 witnesses.insert(witness);
323 }
324
325 let auth_witnesses =
326 self.auth.get(&subject_id).cloned().unwrap_or_default();
327
328 let mut witnesses = witnesses
329 .union(&auth_witnesses)
330 .cloned()
331 .collect::<HashSet<PublicKey>>();
332 witnesses.remove(&self.our_key);
333
334 (witnesses, actual_sn)
335 };
336
337 if witnesses.is_empty() {
338 warn!(
339 msg_type = "Update",
340 subject_id = %subject_id,
341 "Subject has no witnesses to ask for update"
342 );
343 return Err(ActorError::Functional {
344 description: "The subject has no witnesses to try to ask for an update".to_owned(),
345 });
346 } else if witnesses.len() == 1 {
347 let objetive = witnesses.iter().next().expect("len is 1");
348 let info = ComunicateInfo {
349 receiver: objetive.clone(),
350 request_id: String::default(),
351 version: 0,
352 receiver_actor: format!(
353 "/user/node/distributor_{}",
354 subject_id
355 ),
356 };
357
358 if let Err(e) = network
359 .send_command(network::CommandHelper::SendMessage {
360 message: NetworkMessage {
361 info,
362 message: ActorMessage::DistributionLedgerReq {
363 actual_sn,
364 subject_id: subject_id.clone(),
365 },
366 },
367 })
368 .await
369 {
370 error!(
371 msg_type = "Update",
372 subject_id = %subject_id,
373 error = %e,
374 "Cannot send response to network"
375 );
376 return Err(emit_fail(ctx, e).await);
377 };
378
379 debug!(
380 msg_type = "Update",
381 subject_id = %subject_id,
382 "Update message sent to single witness"
383 );
384 } else {
385 let data = UpdateNew {
386 network,
387 subject_id: subject_id.clone(),
388 our_sn: actual_sn,
389 witnesses,
390 update_type: UpdateType::Auth,
391 };
392
393 let updater = Update::new(data);
394 if let Ok(child) =
395 ctx.create_child(&subject_id.to_string(), updater).await
396 {
397 if let Err(e) = child.tell(UpdateMessage::Run).await {
398 error!(
399 msg_type = "Update",
400 subject_id = %subject_id,
401 error = %e,
402 "Failed to send Run message to update actor"
403 );
404 return Err(emit_fail(ctx, e).await);
405 }
406
407 debug!(
408 msg_type = "Update",
409 subject_id = %subject_id,
410 "Update process initiated with multiple witnesses"
411 );
412 } else {
413 info!(
414 msg_type = "Update",
415 subject_id = %subject_id,
416 "An update is already in progress."
417 );
418 };
419 }
420 }
421 };
422
423 Ok(AuthResponse::None)
424 }
425
426 async fn on_event(
427 &mut self,
428 event: AuthEvent,
429 ctx: &mut ActorContext<Self>,
430 ) {
431 if let Err(e) = self.persist(&event, ctx).await {
432 error!(
433 event = ?event,
434 error = %e,
435 "Failed to persist auth event"
436 );
437 emit_fail(ctx, e).await;
438 }
439 }
440
441 async fn on_child_fault(
442 &mut self,
443 error: ActorError,
444 ctx: &mut ActorContext<Self>,
445 ) -> ChildAction {
446 error!(
447 error = %error,
448 "Child actor fault in auth"
449 );
450 emit_fail(ctx, error).await;
451 ChildAction::Stop
452 }
453}
454
455#[async_trait]
456impl PersistentActor for Auth {
457 type Persistence = LightPersistence;
458 type InitParams = (Arc<NetworkSender>, Arc<PublicKey>);
459
460 fn update(&mut self, state: Self) {
461 self.auth = state.auth;
462 }
463
464 fn create_initial(params: Self::InitParams) -> Self {
465 Self {
466 network: Some(params.0),
467 auth: HashMap::new(),
468 our_key: params.1,
469 }
470 }
471
472 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
474 match event {
475 AuthEvent::NewAuth {
476 subject_id,
477 witness,
478 } => {
479 let witnesses = match witness {
480 AuthWitness::One(public_key) => {
481 HashSet::from([public_key.clone()])
482 }
483 AuthWitness::Many(items) => items.iter().cloned().collect(),
484 AuthWitness::None => HashSet::default(),
485 };
486
487 self.auth.insert(subject_id.clone(), witnesses);
488 debug!(
489 event_type = "NewAuth",
490 subject_id = %subject_id,
491 "Applied new auth"
492 );
493 }
494 AuthEvent::DeleteAuth { subject_id } => {
495 self.auth.remove(subject_id);
496 debug!(
497 event_type = "DeleteAuth",
498 subject_id = %subject_id,
499 "Applied auth deletion"
500 );
501 }
502 };
503
504 Ok(())
505 }
506}
507
508#[async_trait]
509impl Storable for Auth {}