Skip to main content

ave_core/model/common/
subject.rs

1use ave_actors::{
2    Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler,
3};
4use std::future::Future;
5
6use ave_common::{
7    identity::{DigestIdentifier, PublicKey},
8    request::EventRequest,
9};
10
11use crate::{
12    approval::persist::{ApprPersist, ApprPersistMessage},
13    governance::{
14        Governance, GovernanceMessage, GovernanceResponse,
15        data::GovernanceData,
16        witnesses_register::{
17            TrackerDeliveryRange, WitnessesRegister, WitnessesRegisterMessage,
18            WitnessesRegisterResponse,
19        },
20    },
21    model::{
22        common::{
23            TrackerVisibilityState, check_subject_creation,
24            node::get_subject_data,
25        },
26        event::Ledger,
27    },
28    node::{
29        SubjectData,
30        subject_manager::{
31            SubjectManager, SubjectManagerMessage, SubjectManagerResponse,
32        },
33    },
34    subject::Metadata,
35    tracker::{Tracker, TrackerMessage, TrackerResponse},
36};
37
38pub async fn get_gov<A>(
39    ctx: &mut ActorContext<A>,
40    governance_id: &DigestIdentifier,
41) -> Result<GovernanceData, ActorError>
42where
43    A: Actor + Handler<A>,
44{
45    let path = ActorPath::from(format!(
46        "/user/node/subject_manager/{}",
47        governance_id
48    ));
49    let governance_actor = ctx.system().get_actor::<Governance>(&path).await?;
50    let response = governance_actor
51        .ask(GovernanceMessage::GetGovernance)
52        .await?;
53
54    match response {
55        GovernanceResponse::Governance(gov_data) => Ok(*gov_data),
56        _ => Err(ActorError::UnexpectedResponse {
57            expected: "GovernanceResponse::Governance".to_owned(),
58            path,
59        }),
60    }
61}
62
63pub async fn up_subject<A>(
64    ctx: &mut ActorContext<A>,
65    subject_id: &DigestIdentifier,
66    requester: String,
67    create_ledger: Option<Ledger>,
68) -> Result<(), ActorError>
69where
70    A: Actor + Handler<A>,
71{
72    let path = ActorPath::from("/user/node/subject_manager");
73    let actor = ctx.system().get_actor::<SubjectManager>(&path).await?;
74    let response = actor
75        .ask(SubjectManagerMessage::Up {
76            subject_id: subject_id.clone(),
77            requester,
78            create_ledger: create_ledger.map(Box::new),
79        })
80        .await?;
81
82    match response {
83        SubjectManagerResponse::Up => Ok(()),
84        _ => Err(ActorError::UnexpectedResponse {
85            expected: "SubjectManagerResponse::Up".to_owned(),
86            path,
87        }),
88    }
89}
90
91pub async fn finish_subject<A>(
92    ctx: &mut ActorContext<A>,
93    subject_id: &DigestIdentifier,
94    requester: String,
95) -> Result<(), ActorError>
96where
97    A: Actor + Handler<A>,
98{
99    let path = ActorPath::from("/user/node/subject_manager");
100    let actor = ctx.system().get_actor::<SubjectManager>(&path).await?;
101    let response = actor
102        .ask(SubjectManagerMessage::Finish {
103            subject_id: subject_id.clone(),
104            requester,
105        })
106        .await?;
107
108    match response {
109        SubjectManagerResponse::Finish => Ok(()),
110        _ => Err(ActorError::UnexpectedResponse {
111            expected: "SubjectManagerResponse::Finish".to_owned(),
112            path,
113        }),
114    }
115}
116
117#[derive(Clone, Debug)]
118pub struct SubjectLease {
119    subject_id: DigestIdentifier,
120    requester: String,
121    active: bool,
122}
123
124impl SubjectLease {
125    pub const fn is_active(&self) -> bool {
126        self.active
127    }
128
129    pub async fn finish<A>(
130        self,
131        ctx: &mut ActorContext<A>,
132    ) -> Result<(), ActorError>
133    where
134        A: Actor + Handler<A>,
135    {
136        if self.active {
137            finish_subject(ctx, &self.subject_id, self.requester).await?;
138        }
139
140        Ok(())
141    }
142
143    pub async fn finish_if<A>(
144        self,
145        ctx: &mut ActorContext<A>,
146        should_finish: bool,
147    ) -> Result<(), ActorError>
148    where
149        A: Actor + Handler<A>,
150    {
151        if should_finish {
152            self.finish(ctx).await?;
153        }
154
155        Ok(())
156    }
157}
158
159pub async fn acquire_subject<A>(
160    ctx: &mut ActorContext<A>,
161    subject_id: &DigestIdentifier,
162    requester: String,
163    create_ledger: Option<Ledger>,
164    active: bool,
165) -> Result<SubjectLease, ActorError>
166where
167    A: Actor + Handler<A>,
168{
169    if active {
170        up_subject(ctx, subject_id, requester.clone(), create_ledger).await?;
171    }
172
173    Ok(SubjectLease {
174        subject_id: subject_id.clone(),
175        requester,
176        active,
177    })
178}
179
180pub async fn with_subject_up<A, F, Fut, T>(
181    ctx: &mut ActorContext<A>,
182    subject_id: &DigestIdentifier,
183    requester: String,
184    create_ledger: Option<Ledger>,
185    active: bool,
186    operation: F,
187) -> Result<T, ActorError>
188where
189    A: Actor + Handler<A>,
190    F: FnOnce(&mut ActorContext<A>) -> Fut,
191    Fut: Future<Output = Result<T, ActorError>>,
192{
193    let lease =
194        acquire_subject(ctx, subject_id, requester, create_ledger, active)
195            .await?;
196    let result = operation(ctx).await;
197    lease.finish(ctx).await?;
198    result
199}
200
201async fn get_subject_path_and_data<A>(
202    ctx: &mut ActorContext<A>,
203    subject_id: &DigestIdentifier,
204) -> Result<(ActorPath, SubjectData), ActorError>
205where
206    A: Actor + Handler<A>,
207{
208    let path =
209        ActorPath::from(format!("/user/node/subject_manager/{}", subject_id));
210    let Some(subject_data) = get_subject_data(ctx, subject_id).await? else {
211        return Err(ActorError::NotFound { path });
212    };
213
214    Ok((path, subject_data))
215}
216
217pub async fn get_metadata<A>(
218    ctx: &mut ActorContext<A>,
219    subject_id: &DigestIdentifier,
220) -> Result<Metadata, ActorError>
221where
222    A: Actor + Handler<A>,
223{
224    let (path, subject_data) =
225        get_subject_path_and_data(ctx, subject_id).await?;
226
227    match subject_data {
228        SubjectData::Tracker { .. } => {
229            let tracker_actor =
230                ctx.system().get_actor::<Tracker>(&path).await?;
231            let response =
232                tracker_actor.ask(TrackerMessage::GetMetadata).await?;
233            match response {
234                TrackerResponse::Metadata(metadata) => Ok(*metadata),
235                _ => Err(ActorError::UnexpectedResponse {
236                    expected: "TrackerResponse::Metadata".to_owned(),
237                    path,
238                }),
239            }
240        }
241        SubjectData::Governance { .. } => {
242            let governance_actor =
243                ctx.system().get_actor::<Governance>(&path).await?;
244            let response =
245                governance_actor.ask(GovernanceMessage::GetMetadata).await?;
246            match response {
247                GovernanceResponse::Metadata(metadata) => Ok(*metadata),
248                _ => Err(ActorError::UnexpectedResponse {
249                    expected: "GovernanceResponse::Metadata".to_owned(),
250                    path,
251                }),
252            }
253        }
254    }
255}
256
257pub async fn get_version<A>(
258    ctx: &mut ActorContext<A>,
259    governance_id: &DigestIdentifier,
260) -> Result<u64, ActorError>
261where
262    A: Actor + Handler<A>,
263{
264    let path = ActorPath::from(format!(
265        "/user/node/subject_manager/{}",
266        governance_id
267    ));
268    let actor = ctx.system().get_actor::<Governance>(&path).await?;
269    let response = actor.ask(GovernanceMessage::GetVersion).await?;
270
271    match response {
272        GovernanceResponse::Version(version) => Ok(version),
273        _ => Err(ActorError::UnexpectedResponse {
274            expected: "GovernanceResponse::Version".to_owned(),
275            path,
276        }),
277    }
278}
279
280pub async fn get_last_ledger_event<A>(
281    ctx: &mut ActorContext<A>,
282    subject_id: &DigestIdentifier,
283) -> Result<Option<Ledger>, ActorError>
284where
285    A: Actor + Handler<A>,
286{
287    let (path, subject_data) =
288        get_subject_path_and_data(ctx, subject_id).await?;
289
290    match subject_data {
291        SubjectData::Tracker { .. } => {
292            let tracker_actor =
293                ctx.system().get_actor::<Tracker>(&path).await?;
294            let response =
295                tracker_actor.ask(TrackerMessage::GetLastLedger).await?;
296            match response {
297                TrackerResponse::LastLedger { ledger_event } => {
298                    Ok(*ledger_event)
299                }
300                _ => Err(ActorError::UnexpectedResponse {
301                    path,
302                    expected: "TrackerResponse::LastLedger".to_owned(),
303                }),
304            }
305        }
306        SubjectData::Governance { .. } => {
307            let governance_actor =
308                ctx.system().get_actor::<Governance>(&path).await?;
309            let response = governance_actor
310                .ask(GovernanceMessage::GetLastLedger)
311                .await?;
312            match response {
313                GovernanceResponse::LastLedger { ledger_event } => {
314                    Ok(*ledger_event)
315                }
316                _ => Err(ActorError::UnexpectedResponse {
317                    path,
318                    expected: "GovernanceResponse::LastLedger".to_owned(),
319                }),
320            }
321        }
322    }
323}
324
325pub async fn update_ledger<A>(
326    ctx: &mut ActorContext<A>,
327    subject_id: &DigestIdentifier,
328    events: Vec<Ledger>,
329) -> Result<(u64, PublicKey, Option<PublicKey>), ActorError>
330where
331    A: Actor + Handler<A>,
332{
333    let (path, subject_data) =
334        get_subject_path_and_data(ctx, subject_id).await?;
335
336    match subject_data {
337        SubjectData::Tracker { .. } => {
338            let tracker_actor =
339                ctx.system().get_actor::<Tracker>(&path).await?;
340            let response = tracker_actor
341                .ask(TrackerMessage::UpdateLedger { events })
342                .await?;
343            match response {
344                TrackerResponse::UpdateResult(last_sn, owner, new_owner) => {
345                    Ok((last_sn, owner, new_owner))
346                }
347                _ => Err(ActorError::UnexpectedResponse {
348                    path,
349                    expected: "TrackerResponse::UpdateResult".to_owned(),
350                }),
351            }
352        }
353        SubjectData::Governance { .. } => {
354            let governance_actor =
355                ctx.system().get_actor::<Governance>(&path).await?;
356            let response = governance_actor
357                .ask(GovernanceMessage::UpdateLedger { events })
358                .await?;
359            match response {
360                GovernanceResponse::UpdateResult(last_sn, owner, new_owner) => {
361                    Ok((last_sn, owner, new_owner))
362                }
363                _ => Err(ActorError::UnexpectedResponse {
364                    path,
365                    expected: "GovernanceResponse::UpdateResult".to_owned(),
366                }),
367            }
368        }
369    }
370}
371
372pub async fn create_subject<A>(
373    ctx: &mut ActorContext<A>,
374    ledger: Ledger,
375) -> Result<(), ActorError>
376where
377    A: Actor + Handler<A>,
378{
379    let mut should_finish = true;
380    if ledger.get_event_request_type().is_create_event()
381        && let EventRequest::Create(request) = ledger
382            .get_event_request()
383            .ok_or_else(|| ActorError::Functional {
384                description: "Can not obtain create event request".to_string(),
385            })?
386    {
387        if request.schema_id.is_gov() {
388            should_finish = false;
389        } else {
390            check_subject_creation(
391                ctx,
392                &request.governance_id,
393                ledger.ledger_seal_signature.signer.clone(),
394                ledger.gov_version,
395                request.namespace.to_string(),
396                request.schema_id,
397            )
398            .await?;
399        }
400    }
401
402    let subject_id = ledger.get_subject_id();
403    let requester = ctx.path().to_string();
404    let lease =
405        acquire_subject(ctx, &subject_id, requester, Some(ledger), true)
406            .await?;
407    lease.finish_if(ctx, should_finish).await?;
408
409    Ok(())
410}
411
412pub async fn get_gov_sn<A>(
413    ctx: &mut ActorContext<A>,
414    governance_id: &DigestIdentifier,
415) -> Result<u64, ActorError>
416where
417    A: Actor + Handler<A>,
418{
419    let actor_path = ActorPath::from(format!(
420        "/user/node/subject_manager/{}/witnesses_register",
421        governance_id
422    ));
423
424    let actor: ActorRef<WitnessesRegister> =
425        ctx.system().get_actor(&actor_path).await?;
426
427    let response = actor.ask(WitnessesRegisterMessage::GetSnGov).await?;
428
429    match response {
430        WitnessesRegisterResponse::GovSn { sn } => Ok(sn),
431        _ => Err(ActorError::UnexpectedResponse {
432            path: actor_path,
433            expected: "WitnessesRegisterResponse::GovSn".to_string(),
434        }),
435    }
436}
437
438pub async fn get_tracker_sn_owner<A>(
439    ctx: &mut ActorContext<A>,
440    governance_id: &DigestIdentifier,
441    subject_id: &DigestIdentifier,
442) -> Result<Option<(PublicKey, u64)>, ActorError>
443where
444    A: Actor + Handler<A>,
445{
446    let actor_path = ActorPath::from(format!(
447        "/user/node/subject_manager/{}/witnesses_register",
448        governance_id
449    ));
450
451    let actor: ActorRef<WitnessesRegister> =
452        ctx.system().get_actor(&actor_path).await?;
453
454    let response = actor
455        .ask(WitnessesRegisterMessage::GetTrackerSnOwner {
456            subject_id: subject_id.clone(),
457        })
458        .await?;
459
460    match response {
461        WitnessesRegisterResponse::TrackerOwnerSn { data } => Ok(data),
462        _ => Err(ActorError::UnexpectedResponse {
463            path: actor_path,
464            expected: "WitnessesRegisterResponse::TrackerSn".to_string(),
465        }),
466    }
467}
468
469pub async fn get_tracker_visibility_state<A>(
470    ctx: &mut ActorContext<A>,
471    governance_id: &DigestIdentifier,
472    subject_id: &DigestIdentifier,
473) -> Result<TrackerVisibilityState, ActorError>
474where
475    A: Actor + Handler<A>,
476{
477    let actor_path = ActorPath::from(format!(
478        "/user/node/subject_manager/{}/witnesses_register",
479        governance_id
480    ));
481
482    let actor: ActorRef<WitnessesRegister> =
483        ctx.system().get_actor(&actor_path).await?;
484
485    let response = actor
486        .ask(WitnessesRegisterMessage::GetTrackerVisibilityState {
487            subject_id: subject_id.clone(),
488        })
489        .await?;
490
491    match response {
492        WitnessesRegisterResponse::TrackerVisibilityState { state } => {
493            Ok(state)
494        }
495        _ => Err(ActorError::UnexpectedResponse {
496            path: actor_path,
497            expected: "WitnessesRegisterResponse::TrackerVisibilityState"
498                .to_string(),
499        }),
500    }
501}
502
503pub async fn get_local_subject_sn<A>(
504    ctx: &mut ActorContext<A>,
505    subject_id: &DigestIdentifier,
506) -> Result<Option<u64>, ActorError>
507where
508    A: Actor + Handler<A>,
509{
510    let Some(subject_data) = get_subject_data(ctx, subject_id).await? else {
511        return Ok(None);
512    };
513
514    match subject_data {
515        SubjectData::Tracker { governance_id, .. } => {
516            Ok(get_tracker_sn_owner(ctx, &governance_id, subject_id)
517                .await?
518                .map(|(_, sn)| sn))
519        }
520        SubjectData::Governance { .. } => {
521            Ok(Some(get_gov_sn(ctx, subject_id).await?))
522        }
523    }
524}
525
526pub async fn get_tracker_window<A>(
527    ctx: &mut ActorContext<A>,
528    governance_id: &DigestIdentifier,
529    subject_id: &DigestIdentifier,
530    node: PublicKey,
531    namespace: String,
532    schema_id: ave_common::SchemaType,
533    actual_sn: Option<u64>,
534) -> Result<
535    (Option<u64>, Option<u64>, bool, Vec<TrackerDeliveryRange>),
536    ActorError,
537>
538where
539    A: Actor + Handler<A>,
540{
541    let actor_path = ActorPath::from(format!(
542        "/user/node/subject_manager/{}/witnesses_register",
543        governance_id
544    ));
545
546    let actor: ActorRef<WitnessesRegister> =
547        ctx.system().get_actor(&actor_path).await?;
548
549    let response = actor
550        .ask(WitnessesRegisterMessage::GetTrackerWindow {
551            subject_id: subject_id.clone(),
552            node,
553            namespace,
554            schema_id,
555            actual_sn,
556        })
557        .await?;
558
559    match response {
560        WitnessesRegisterResponse::TrackerWindow {
561            sn,
562            clear_sn,
563            is_all,
564            ranges,
565        } => Ok((sn, clear_sn, is_all, ranges)),
566        _ => Err(ActorError::UnexpectedResponse {
567            path: actor_path,
568            expected: "WitnessesRegisterResponse::TrackerWindow".to_string(),
569        }),
570    }
571}
572
573pub async fn make_obsolete<A>(
574    ctx: &mut ActorContext<A>,
575    governance_id: &DigestIdentifier,
576) -> Result<(), ActorError>
577where
578    A: Actor + Handler<A>,
579{
580    let actor_path = ActorPath::from(format!(
581        "/user/node/subject_manager/{}/approver",
582        governance_id
583    ));
584
585    let actor: ActorRef<ApprPersist> =
586        ctx.system().get_actor(&actor_path).await?;
587
588    actor.tell(ApprPersistMessage::MakeObsolete).await
589}