1use ave_actors::{
2 Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler,
3};
4use std::future::Future;
5
6use ave_common::{
7 identity::{DigestIdentifier, PublicKey},
8 request::EventRequest,
9};
10
11use crate::{
12 approval::persist::{ApprPersist, ApprPersistMessage},
13 governance::{
14 Governance, GovernanceMessage, GovernanceResponse,
15 data::GovernanceData,
16 witnesses_register::{
17 TrackerDeliveryRange, WitnessesRegister, WitnessesRegisterMessage,
18 WitnessesRegisterResponse,
19 },
20 },
21 model::{
22 common::{
23 TrackerVisibilityState, check_subject_creation,
24 node::get_subject_data,
25 },
26 event::Ledger,
27 },
28 node::{
29 SubjectData,
30 subject_manager::{
31 SubjectManager, SubjectManagerMessage, SubjectManagerResponse,
32 },
33 },
34 subject::Metadata,
35 tracker::{Tracker, TrackerMessage, TrackerResponse},
36};
37
38pub async fn get_gov<A>(
39 ctx: &mut ActorContext<A>,
40 governance_id: &DigestIdentifier,
41) -> Result<GovernanceData, ActorError>
42where
43 A: Actor + Handler<A>,
44{
45 let path = ActorPath::from(format!(
46 "/user/node/subject_manager/{}",
47 governance_id
48 ));
49 let governance_actor = ctx.system().get_actor::<Governance>(&path).await?;
50 let response = governance_actor
51 .ask(GovernanceMessage::GetGovernance)
52 .await?;
53
54 match response {
55 GovernanceResponse::Governance(gov_data) => Ok(*gov_data),
56 _ => Err(ActorError::UnexpectedResponse {
57 expected: "GovernanceResponse::Governance".to_owned(),
58 path,
59 }),
60 }
61}
62
63pub async fn up_subject<A>(
64 ctx: &mut ActorContext<A>,
65 subject_id: &DigestIdentifier,
66 requester: String,
67 create_ledger: Option<Ledger>,
68) -> Result<(), ActorError>
69where
70 A: Actor + Handler<A>,
71{
72 let path = ActorPath::from("/user/node/subject_manager");
73 let actor = ctx.system().get_actor::<SubjectManager>(&path).await?;
74 let response = actor
75 .ask(SubjectManagerMessage::Up {
76 subject_id: subject_id.clone(),
77 requester,
78 create_ledger: create_ledger.map(Box::new),
79 })
80 .await?;
81
82 match response {
83 SubjectManagerResponse::Up => Ok(()),
84 _ => Err(ActorError::UnexpectedResponse {
85 expected: "SubjectManagerResponse::Up".to_owned(),
86 path,
87 }),
88 }
89}
90
91pub async fn finish_subject<A>(
92 ctx: &mut ActorContext<A>,
93 subject_id: &DigestIdentifier,
94 requester: String,
95) -> Result<(), ActorError>
96where
97 A: Actor + Handler<A>,
98{
99 let path = ActorPath::from("/user/node/subject_manager");
100 let actor = ctx.system().get_actor::<SubjectManager>(&path).await?;
101 let response = actor
102 .ask(SubjectManagerMessage::Finish {
103 subject_id: subject_id.clone(),
104 requester,
105 })
106 .await?;
107
108 match response {
109 SubjectManagerResponse::Finish => Ok(()),
110 _ => Err(ActorError::UnexpectedResponse {
111 expected: "SubjectManagerResponse::Finish".to_owned(),
112 path,
113 }),
114 }
115}
116
117#[derive(Clone, Debug)]
118pub struct SubjectLease {
119 subject_id: DigestIdentifier,
120 requester: String,
121 active: bool,
122}
123
124impl SubjectLease {
125 pub const fn is_active(&self) -> bool {
126 self.active
127 }
128
129 pub async fn finish<A>(
130 self,
131 ctx: &mut ActorContext<A>,
132 ) -> Result<(), ActorError>
133 where
134 A: Actor + Handler<A>,
135 {
136 if self.active {
137 finish_subject(ctx, &self.subject_id, self.requester).await?;
138 }
139
140 Ok(())
141 }
142
143 pub async fn finish_if<A>(
144 self,
145 ctx: &mut ActorContext<A>,
146 should_finish: bool,
147 ) -> Result<(), ActorError>
148 where
149 A: Actor + Handler<A>,
150 {
151 if should_finish {
152 self.finish(ctx).await?;
153 }
154
155 Ok(())
156 }
157}
158
159pub async fn acquire_subject<A>(
160 ctx: &mut ActorContext<A>,
161 subject_id: &DigestIdentifier,
162 requester: String,
163 create_ledger: Option<Ledger>,
164 active: bool,
165) -> Result<SubjectLease, ActorError>
166where
167 A: Actor + Handler<A>,
168{
169 if active {
170 up_subject(ctx, subject_id, requester.clone(), create_ledger).await?;
171 }
172
173 Ok(SubjectLease {
174 subject_id: subject_id.clone(),
175 requester,
176 active,
177 })
178}
179
180pub async fn with_subject_up<A, F, Fut, T>(
181 ctx: &mut ActorContext<A>,
182 subject_id: &DigestIdentifier,
183 requester: String,
184 create_ledger: Option<Ledger>,
185 active: bool,
186 operation: F,
187) -> Result<T, ActorError>
188where
189 A: Actor + Handler<A>,
190 F: FnOnce(&mut ActorContext<A>) -> Fut,
191 Fut: Future<Output = Result<T, ActorError>>,
192{
193 let lease =
194 acquire_subject(ctx, subject_id, requester, create_ledger, active)
195 .await?;
196 let result = operation(ctx).await;
197 lease.finish(ctx).await?;
198 result
199}
200
201async fn get_subject_path_and_data<A>(
202 ctx: &mut ActorContext<A>,
203 subject_id: &DigestIdentifier,
204) -> Result<(ActorPath, SubjectData), ActorError>
205where
206 A: Actor + Handler<A>,
207{
208 let path =
209 ActorPath::from(format!("/user/node/subject_manager/{}", subject_id));
210 let Some(subject_data) = get_subject_data(ctx, subject_id).await? else {
211 return Err(ActorError::NotFound { path });
212 };
213
214 Ok((path, subject_data))
215}
216
217pub async fn get_metadata<A>(
218 ctx: &mut ActorContext<A>,
219 subject_id: &DigestIdentifier,
220) -> Result<Metadata, ActorError>
221where
222 A: Actor + Handler<A>,
223{
224 let (path, subject_data) =
225 get_subject_path_and_data(ctx, subject_id).await?;
226
227 match subject_data {
228 SubjectData::Tracker { .. } => {
229 let tracker_actor =
230 ctx.system().get_actor::<Tracker>(&path).await?;
231 let response =
232 tracker_actor.ask(TrackerMessage::GetMetadata).await?;
233 match response {
234 TrackerResponse::Metadata(metadata) => Ok(*metadata),
235 _ => Err(ActorError::UnexpectedResponse {
236 expected: "TrackerResponse::Metadata".to_owned(),
237 path,
238 }),
239 }
240 }
241 SubjectData::Governance { .. } => {
242 let governance_actor =
243 ctx.system().get_actor::<Governance>(&path).await?;
244 let response =
245 governance_actor.ask(GovernanceMessage::GetMetadata).await?;
246 match response {
247 GovernanceResponse::Metadata(metadata) => Ok(*metadata),
248 _ => Err(ActorError::UnexpectedResponse {
249 expected: "GovernanceResponse::Metadata".to_owned(),
250 path,
251 }),
252 }
253 }
254 }
255}
256
257pub async fn get_version<A>(
258 ctx: &mut ActorContext<A>,
259 governance_id: &DigestIdentifier,
260) -> Result<u64, ActorError>
261where
262 A: Actor + Handler<A>,
263{
264 let path = ActorPath::from(format!(
265 "/user/node/subject_manager/{}",
266 governance_id
267 ));
268 let actor = ctx.system().get_actor::<Governance>(&path).await?;
269 let response = actor.ask(GovernanceMessage::GetVersion).await?;
270
271 match response {
272 GovernanceResponse::Version(version) => Ok(version),
273 _ => Err(ActorError::UnexpectedResponse {
274 expected: "GovernanceResponse::Version".to_owned(),
275 path,
276 }),
277 }
278}
279
280pub async fn get_last_ledger_event<A>(
281 ctx: &mut ActorContext<A>,
282 subject_id: &DigestIdentifier,
283) -> Result<Option<Ledger>, ActorError>
284where
285 A: Actor + Handler<A>,
286{
287 let (path, subject_data) =
288 get_subject_path_and_data(ctx, subject_id).await?;
289
290 match subject_data {
291 SubjectData::Tracker { .. } => {
292 let tracker_actor =
293 ctx.system().get_actor::<Tracker>(&path).await?;
294 let response =
295 tracker_actor.ask(TrackerMessage::GetLastLedger).await?;
296 match response {
297 TrackerResponse::LastLedger { ledger_event } => {
298 Ok(*ledger_event)
299 }
300 _ => Err(ActorError::UnexpectedResponse {
301 path,
302 expected: "TrackerResponse::LastLedger".to_owned(),
303 }),
304 }
305 }
306 SubjectData::Governance { .. } => {
307 let governance_actor =
308 ctx.system().get_actor::<Governance>(&path).await?;
309 let response = governance_actor
310 .ask(GovernanceMessage::GetLastLedger)
311 .await?;
312 match response {
313 GovernanceResponse::LastLedger { ledger_event } => {
314 Ok(*ledger_event)
315 }
316 _ => Err(ActorError::UnexpectedResponse {
317 path,
318 expected: "GovernanceResponse::LastLedger".to_owned(),
319 }),
320 }
321 }
322 }
323}
324
325pub async fn update_ledger<A>(
326 ctx: &mut ActorContext<A>,
327 subject_id: &DigestIdentifier,
328 events: Vec<Ledger>,
329) -> Result<(u64, PublicKey, Option<PublicKey>), ActorError>
330where
331 A: Actor + Handler<A>,
332{
333 let (path, subject_data) =
334 get_subject_path_and_data(ctx, subject_id).await?;
335
336 match subject_data {
337 SubjectData::Tracker { .. } => {
338 let tracker_actor =
339 ctx.system().get_actor::<Tracker>(&path).await?;
340 let response = tracker_actor
341 .ask(TrackerMessage::UpdateLedger { events })
342 .await?;
343 match response {
344 TrackerResponse::UpdateResult(last_sn, owner, new_owner) => {
345 Ok((last_sn, owner, new_owner))
346 }
347 _ => Err(ActorError::UnexpectedResponse {
348 path,
349 expected: "TrackerResponse::UpdateResult".to_owned(),
350 }),
351 }
352 }
353 SubjectData::Governance { .. } => {
354 let governance_actor =
355 ctx.system().get_actor::<Governance>(&path).await?;
356 let response = governance_actor
357 .ask(GovernanceMessage::UpdateLedger { events })
358 .await?;
359 match response {
360 GovernanceResponse::UpdateResult(last_sn, owner, new_owner) => {
361 Ok((last_sn, owner, new_owner))
362 }
363 _ => Err(ActorError::UnexpectedResponse {
364 path,
365 expected: "GovernanceResponse::UpdateResult".to_owned(),
366 }),
367 }
368 }
369 }
370}
371
372pub async fn create_subject<A>(
373 ctx: &mut ActorContext<A>,
374 ledger: Ledger,
375) -> Result<(), ActorError>
376where
377 A: Actor + Handler<A>,
378{
379 let mut should_finish = true;
380 if ledger.get_event_request_type().is_create_event()
381 && let EventRequest::Create(request) = ledger
382 .get_event_request()
383 .ok_or_else(|| ActorError::Functional {
384 description: "Can not obtain create event request".to_string(),
385 })?
386 {
387 if request.schema_id.is_gov() {
388 should_finish = false;
389 } else {
390 check_subject_creation(
391 ctx,
392 &request.governance_id,
393 ledger.ledger_seal_signature.signer.clone(),
394 ledger.gov_version,
395 request.namespace.to_string(),
396 request.schema_id,
397 )
398 .await?;
399 }
400 }
401
402 let subject_id = ledger.get_subject_id();
403 let requester = ctx.path().to_string();
404 let lease =
405 acquire_subject(ctx, &subject_id, requester, Some(ledger), true)
406 .await?;
407 lease.finish_if(ctx, should_finish).await?;
408
409 Ok(())
410}
411
412pub async fn get_gov_sn<A>(
413 ctx: &mut ActorContext<A>,
414 governance_id: &DigestIdentifier,
415) -> Result<u64, ActorError>
416where
417 A: Actor + Handler<A>,
418{
419 let actor_path = ActorPath::from(format!(
420 "/user/node/subject_manager/{}/witnesses_register",
421 governance_id
422 ));
423
424 let actor: ActorRef<WitnessesRegister> =
425 ctx.system().get_actor(&actor_path).await?;
426
427 let response = actor.ask(WitnessesRegisterMessage::GetSnGov).await?;
428
429 match response {
430 WitnessesRegisterResponse::GovSn { sn } => Ok(sn),
431 _ => Err(ActorError::UnexpectedResponse {
432 path: actor_path,
433 expected: "WitnessesRegisterResponse::GovSn".to_string(),
434 }),
435 }
436}
437
438pub async fn get_tracker_sn_owner<A>(
439 ctx: &mut ActorContext<A>,
440 governance_id: &DigestIdentifier,
441 subject_id: &DigestIdentifier,
442) -> Result<Option<(PublicKey, u64)>, ActorError>
443where
444 A: Actor + Handler<A>,
445{
446 let actor_path = ActorPath::from(format!(
447 "/user/node/subject_manager/{}/witnesses_register",
448 governance_id
449 ));
450
451 let actor: ActorRef<WitnessesRegister> =
452 ctx.system().get_actor(&actor_path).await?;
453
454 let response = actor
455 .ask(WitnessesRegisterMessage::GetTrackerSnOwner {
456 subject_id: subject_id.clone(),
457 })
458 .await?;
459
460 match response {
461 WitnessesRegisterResponse::TrackerOwnerSn { data } => Ok(data),
462 _ => Err(ActorError::UnexpectedResponse {
463 path: actor_path,
464 expected: "WitnessesRegisterResponse::TrackerSn".to_string(),
465 }),
466 }
467}
468
469pub async fn get_tracker_visibility_state<A>(
470 ctx: &mut ActorContext<A>,
471 governance_id: &DigestIdentifier,
472 subject_id: &DigestIdentifier,
473) -> Result<TrackerVisibilityState, ActorError>
474where
475 A: Actor + Handler<A>,
476{
477 let actor_path = ActorPath::from(format!(
478 "/user/node/subject_manager/{}/witnesses_register",
479 governance_id
480 ));
481
482 let actor: ActorRef<WitnessesRegister> =
483 ctx.system().get_actor(&actor_path).await?;
484
485 let response = actor
486 .ask(WitnessesRegisterMessage::GetTrackerVisibilityState {
487 subject_id: subject_id.clone(),
488 })
489 .await?;
490
491 match response {
492 WitnessesRegisterResponse::TrackerVisibilityState { state } => {
493 Ok(state)
494 }
495 _ => Err(ActorError::UnexpectedResponse {
496 path: actor_path,
497 expected: "WitnessesRegisterResponse::TrackerVisibilityState"
498 .to_string(),
499 }),
500 }
501}
502
503pub async fn get_local_subject_sn<A>(
504 ctx: &mut ActorContext<A>,
505 subject_id: &DigestIdentifier,
506) -> Result<Option<u64>, ActorError>
507where
508 A: Actor + Handler<A>,
509{
510 let Some(subject_data) = get_subject_data(ctx, subject_id).await? else {
511 return Ok(None);
512 };
513
514 match subject_data {
515 SubjectData::Tracker { governance_id, .. } => {
516 Ok(get_tracker_sn_owner(ctx, &governance_id, subject_id)
517 .await?
518 .map(|(_, sn)| sn))
519 }
520 SubjectData::Governance { .. } => {
521 Ok(Some(get_gov_sn(ctx, subject_id).await?))
522 }
523 }
524}
525
526pub async fn get_tracker_window<A>(
527 ctx: &mut ActorContext<A>,
528 governance_id: &DigestIdentifier,
529 subject_id: &DigestIdentifier,
530 node: PublicKey,
531 namespace: String,
532 schema_id: ave_common::SchemaType,
533 actual_sn: Option<u64>,
534) -> Result<
535 (Option<u64>, Option<u64>, bool, Vec<TrackerDeliveryRange>),
536 ActorError,
537>
538where
539 A: Actor + Handler<A>,
540{
541 let actor_path = ActorPath::from(format!(
542 "/user/node/subject_manager/{}/witnesses_register",
543 governance_id
544 ));
545
546 let actor: ActorRef<WitnessesRegister> =
547 ctx.system().get_actor(&actor_path).await?;
548
549 let response = actor
550 .ask(WitnessesRegisterMessage::GetTrackerWindow {
551 subject_id: subject_id.clone(),
552 node,
553 namespace,
554 schema_id,
555 actual_sn,
556 })
557 .await?;
558
559 match response {
560 WitnessesRegisterResponse::TrackerWindow {
561 sn,
562 clear_sn,
563 is_all,
564 ranges,
565 } => Ok((sn, clear_sn, is_all, ranges)),
566 _ => Err(ActorError::UnexpectedResponse {
567 path: actor_path,
568 expected: "WitnessesRegisterResponse::TrackerWindow".to_string(),
569 }),
570 }
571}
572
573pub async fn make_obsolete<A>(
574 ctx: &mut ActorContext<A>,
575 governance_id: &DigestIdentifier,
576) -> Result<(), ActorError>
577where
578 A: Actor + Handler<A>,
579{
580 let actor_path = ActorPath::from(format!(
581 "/user/node/subject_manager/{}/approver",
582 governance_id
583 ));
584
585 let actor: ActorRef<ApprPersist> =
586 ctx.system().get_actor(&actor_path).await?;
587
588 actor.tell(ApprPersistMessage::MakeObsolete).await
589}