augrim/two_phase_commit/
unified_algorithm.rs

1// Copyright 2021-2022 Cargill Incorporated
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::algorithm::{Algorithm, Value};
16use crate::error::AlgorithmError;
17use crate::process::Process;
18use crate::time::TimeSource;
19
20use super::CoordinatorAlgorithm;
21use super::ParticipantAlgorithm;
22use super::TwoPhaseCommitAction;
23use super::TwoPhaseCommitContext;
24use super::TwoPhaseCommitEvent;
25
26pub struct TwoPhaseCommitAlgorithm<P, V, TS>
27where
28    P: Process,
29    V: Value,
30    TS: TimeSource,
31{
32    coordinator: CoordinatorAlgorithm<P, V, TS>,
33    participant: ParticipantAlgorithm<P, V, TS>,
34}
35
36impl<P, V, TS> TwoPhaseCommitAlgorithm<P, V, TS>
37where
38    P: Process,
39    V: Value,
40    TS: TimeSource + Clone,
41{
42    pub fn new(time_source: TS) -> Self {
43        Self {
44            coordinator: CoordinatorAlgorithm::new(time_source.clone()),
45            participant: ParticipantAlgorithm::new(time_source),
46        }
47    }
48}
49
50impl<P, V, TS> Algorithm for TwoPhaseCommitAlgorithm<P, V, TS>
51where
52    P: Process,
53    V: Value,
54    TS: TimeSource,
55{
56    type Event = TwoPhaseCommitEvent<P, V>;
57    type Action = TwoPhaseCommitAction<P, V, TS::Time>;
58    type Context = TwoPhaseCommitContext<P, TS::Time>;
59
60    fn event(
61        &self,
62        event: Self::Event,
63        context: Self::Context,
64    ) -> Result<Vec<Self::Action>, AlgorithmError> {
65        if context.coordinator() == context.this_process() {
66            self.coordinator
67                .event(event.try_into()?, context.try_into()?)
68                .map(|v| v.into_iter().map(|a| a.into()).collect())
69        } else {
70            self.participant
71                .event(event.try_into()?, context.try_into()?)
72                .map(|v| v.into_iter().map(|a| a.into()).collect())
73        }
74    }
75}