augrim/two_phase_commit/
unified_algorithm.rs1use crate::algorithm::{Algorithm, Value};
16use crate::error::AlgorithmError;
17use crate::process::Process;
18use crate::time::TimeSource;
19
20use super::CoordinatorAlgorithm;
21use super::ParticipantAlgorithm;
22use super::TwoPhaseCommitAction;
23use super::TwoPhaseCommitContext;
24use super::TwoPhaseCommitEvent;
25
26pub struct TwoPhaseCommitAlgorithm<P, V, TS>
27where
28 P: Process,
29 V: Value,
30 TS: TimeSource,
31{
32 coordinator: CoordinatorAlgorithm<P, V, TS>,
33 participant: ParticipantAlgorithm<P, V, TS>,
34}
35
36impl<P, V, TS> TwoPhaseCommitAlgorithm<P, V, TS>
37where
38 P: Process,
39 V: Value,
40 TS: TimeSource + Clone,
41{
42 pub fn new(time_source: TS) -> Self {
43 Self {
44 coordinator: CoordinatorAlgorithm::new(time_source.clone()),
45 participant: ParticipantAlgorithm::new(time_source),
46 }
47 }
48}
49
50impl<P, V, TS> Algorithm for TwoPhaseCommitAlgorithm<P, V, TS>
51where
52 P: Process,
53 V: Value,
54 TS: TimeSource,
55{
56 type Event = TwoPhaseCommitEvent<P, V>;
57 type Action = TwoPhaseCommitAction<P, V, TS::Time>;
58 type Context = TwoPhaseCommitContext<P, TS::Time>;
59
60 fn event(
61 &self,
62 event: Self::Event,
63 context: Self::Context,
64 ) -> Result<Vec<Self::Action>, AlgorithmError> {
65 if context.coordinator() == context.this_process() {
66 self.coordinator
67 .event(event.try_into()?, context.try_into()?)
68 .map(|v| v.into_iter().map(|a| a.into()).collect())
69 } else {
70 self.participant
71 .event(event.try_into()?, context.try_into()?)
72 .map(|v| v.into_iter().map(|a| a.into()).collect())
73 }
74 }
75}