thingvellir/shard/
commit_policy.rs

1use tokio::time::Duration;
2
3use crate::UpstreamError;
4use smallvec::SmallVec;
5
6pub(crate) enum InnerCommitPolicy {
7    Noop,
8    Immediate(ImmediateCommitWaiter),
9    Within(Duration),
10}
11
12#[derive(Clone, Copy)]
13pub enum DefaultCommitPolicy {
14    /// The data must be persisted successfully before the execution returns.
15    Immediate,
16    /// The data should try to be persisted with at most a `Duration` delay.
17    Within(Duration),
18}
19
20pub(super) enum CommitPolicy {
21    /// The data should persist with the default persistence policy of the service.
22    Default,
23    /// The mutation was a no-op, and no additional persistence should be enqueued.
24    Noop,
25    /// The data must be persisted successfully before the execution returns.
26    Immediate,
27    /// The data should try to be persisted with at most a `Duration` delay.
28    Within(Duration),
29}
30
31pub(super) enum CommitPolicyNoDefault {
32    Noop,
33    Immediate,
34    Within(Duration),
35}
36
37impl CommitPolicy {
38    pub(super) fn apply_default(
39        self,
40        default_commit_policy: DefaultCommitPolicy,
41    ) -> CommitPolicyNoDefault {
42        match self {
43            CommitPolicy::Default => default_commit_policy.into(),
44            CommitPolicy::Noop => CommitPolicyNoDefault::Noop,
45            CommitPolicy::Within(duration) => CommitPolicyNoDefault::Within(duration),
46            CommitPolicy::Immediate => CommitPolicyNoDefault::Immediate,
47        }
48    }
49}
50
51/// Specifies how the `execute_mut` should be committed to the upstream.
52///
53/// This allows you to control the durability of your data mutation.
54///
55/// [`execute_mut`]: ServiceHandle::execute_mut
56pub struct Commit<Data>(CommitPolicy, Data);
57
58impl<Data> Commit<Data> {
59    /// Persist the item immediately. The execution future will not resolve until your write has been been
60    /// acknowledged by the upstream layer.
61    pub fn immediately(data: Data) -> Self {
62        Self(CommitPolicy::Immediate, data)
63    }
64
65    /// Persist the data, using the default persistence policy for the service.
66    pub fn default(data: Data) -> Self {
67        Self(CommitPolicy::Default, data)
68    }
69
70    /// Ensure that the data is persisted within a given duration.
71    pub fn within(data: Data, duration: Duration) -> Self {
72        Self(CommitPolicy::Within(duration), data)
73    }
74
75    /// The mutation resulted in a no-op. Do not try to persist the data in any way.
76    ///
77    /// # Safety
78    ///
79    /// This function is unsafe, as it's up to you to make sure to only call this *if and only if*
80    /// the data truly did not change. Since you're getting a `&mut` to the data, this contract
81    /// cannot be statically enforced. So, it's up to you to use correctly.
82    pub unsafe fn noop(data: Data) -> Self {
83        Self(CommitPolicy::Noop, data)
84    }
85
86    pub(super) fn into_inner(self) -> (CommitPolicy, Data) {
87        (self.0, self.1)
88    }
89}
90
91impl From<DefaultCommitPolicy> for CommitPolicyNoDefault {
92    fn from(default_commit_policy: DefaultCommitPolicy) -> Self {
93        match default_commit_policy {
94            DefaultCommitPolicy::Within(duration) => Self::Within(duration),
95            DefaultCommitPolicy::Immediate => Self::Immediate,
96        }
97    }
98}
99
100pub(super) type ImmediateCommitWaiter = Box<dyn FnOnce(Result<(), UpstreamError>) + Send + 'static>;
101
102pub(super) type ImmediateCommitWaiters = SmallVec<[ImmediateCommitWaiter; 1]>;
103
104pub(super) enum AccumulatedCommitPolicy {
105    Noop,
106    Within(Duration),
107    Immediate(ImmediateCommitWaiters),
108}
109
110impl From<InnerCommitPolicy> for AccumulatedCommitPolicy {
111    fn from(commit_policy: InnerCommitPolicy) -> Self {
112        match commit_policy {
113            InnerCommitPolicy::Within(duration) => AccumulatedCommitPolicy::Within(duration),
114            InnerCommitPolicy::Immediate(callback) => callback.into(),
115            InnerCommitPolicy::Noop => AccumulatedCommitPolicy::Noop,
116        }
117    }
118}
119
120impl From<ImmediateCommitWaiter> for AccumulatedCommitPolicy {
121    fn from(waiter: ImmediateCommitWaiter) -> Self {
122        let mut vec = SmallVec::new();
123        vec.push(waiter);
124        AccumulatedCommitPolicy::Immediate(vec)
125    }
126}
127
128impl AccumulatedCommitPolicy {
129    pub(super) fn new() -> Self {
130        AccumulatedCommitPolicy::Noop
131    }
132
133    pub(super) fn did_mutate(&self) -> bool {
134        match self {
135            AccumulatedCommitPolicy::Noop => false,
136            AccumulatedCommitPolicy::Within(_) | AccumulatedCommitPolicy::Immediate(_) => true,
137        }
138    }
139
140    pub(super) fn accumulate(self, commit_policy: InnerCommitPolicy) -> Self {
141        match commit_policy {
142            InnerCommitPolicy::Within(duration) => self.accumulate_commit_within(duration),
143            InnerCommitPolicy::Immediate(waiter) => self.accumulate_commit_immediate(waiter),
144            InnerCommitPolicy::Noop => self,
145        }
146    }
147
148    fn accumulate_commit_within(self, duration: Duration) -> Self {
149        match self {
150            AccumulatedCommitPolicy::Noop => AccumulatedCommitPolicy::Within(duration),
151            AccumulatedCommitPolicy::Within(existing_duration) => {
152                // We want to keep the minimum of the two durations.
153                AccumulatedCommitPolicy::Within(std::cmp::min(existing_duration, duration))
154            }
155            // There's nothing faster than immediate.
156            AccumulatedCommitPolicy::Immediate(_) => self,
157        }
158    }
159
160    fn accumulate_commit_immediate(self, waiter: ImmediateCommitWaiter) -> Self {
161        match self {
162            AccumulatedCommitPolicy::Noop | AccumulatedCommitPolicy::Within(_) => waiter.into(),
163            AccumulatedCommitPolicy::Immediate(mut waiters) => {
164                waiters.push(waiter);
165                AccumulatedCommitPolicy::Immediate(waiters)
166            }
167        }
168    }
169}