thingvellir/shard/
commit_policy.rs1use tokio::time::Duration;
2
3use crate::UpstreamError;
4use smallvec::SmallVec;
5
6pub(crate) enum InnerCommitPolicy {
7 Noop,
8 Immediate(ImmediateCommitWaiter),
9 Within(Duration),
10}
11
12#[derive(Clone, Copy)]
13pub enum DefaultCommitPolicy {
14 Immediate,
16 Within(Duration),
18}
19
20pub(super) enum CommitPolicy {
21 Default,
23 Noop,
25 Immediate,
27 Within(Duration),
29}
30
31pub(super) enum CommitPolicyNoDefault {
32 Noop,
33 Immediate,
34 Within(Duration),
35}
36
37impl CommitPolicy {
38 pub(super) fn apply_default(
39 self,
40 default_commit_policy: DefaultCommitPolicy,
41 ) -> CommitPolicyNoDefault {
42 match self {
43 CommitPolicy::Default => default_commit_policy.into(),
44 CommitPolicy::Noop => CommitPolicyNoDefault::Noop,
45 CommitPolicy::Within(duration) => CommitPolicyNoDefault::Within(duration),
46 CommitPolicy::Immediate => CommitPolicyNoDefault::Immediate,
47 }
48 }
49}
50
51pub struct Commit<Data>(CommitPolicy, Data);
57
58impl<Data> Commit<Data> {
59 pub fn immediately(data: Data) -> Self {
62 Self(CommitPolicy::Immediate, data)
63 }
64
65 pub fn default(data: Data) -> Self {
67 Self(CommitPolicy::Default, data)
68 }
69
70 pub fn within(data: Data, duration: Duration) -> Self {
72 Self(CommitPolicy::Within(duration), data)
73 }
74
75 pub unsafe fn noop(data: Data) -> Self {
83 Self(CommitPolicy::Noop, data)
84 }
85
86 pub(super) fn into_inner(self) -> (CommitPolicy, Data) {
87 (self.0, self.1)
88 }
89}
90
91impl From<DefaultCommitPolicy> for CommitPolicyNoDefault {
92 fn from(default_commit_policy: DefaultCommitPolicy) -> Self {
93 match default_commit_policy {
94 DefaultCommitPolicy::Within(duration) => Self::Within(duration),
95 DefaultCommitPolicy::Immediate => Self::Immediate,
96 }
97 }
98}
99
100pub(super) type ImmediateCommitWaiter = Box<dyn FnOnce(Result<(), UpstreamError>) + Send + 'static>;
101
102pub(super) type ImmediateCommitWaiters = SmallVec<[ImmediateCommitWaiter; 1]>;
103
104pub(super) enum AccumulatedCommitPolicy {
105 Noop,
106 Within(Duration),
107 Immediate(ImmediateCommitWaiters),
108}
109
110impl From<InnerCommitPolicy> for AccumulatedCommitPolicy {
111 fn from(commit_policy: InnerCommitPolicy) -> Self {
112 match commit_policy {
113 InnerCommitPolicy::Within(duration) => AccumulatedCommitPolicy::Within(duration),
114 InnerCommitPolicy::Immediate(callback) => callback.into(),
115 InnerCommitPolicy::Noop => AccumulatedCommitPolicy::Noop,
116 }
117 }
118}
119
120impl From<ImmediateCommitWaiter> for AccumulatedCommitPolicy {
121 fn from(waiter: ImmediateCommitWaiter) -> Self {
122 let mut vec = SmallVec::new();
123 vec.push(waiter);
124 AccumulatedCommitPolicy::Immediate(vec)
125 }
126}
127
128impl AccumulatedCommitPolicy {
129 pub(super) fn new() -> Self {
130 AccumulatedCommitPolicy::Noop
131 }
132
133 pub(super) fn did_mutate(&self) -> bool {
134 match self {
135 AccumulatedCommitPolicy::Noop => false,
136 AccumulatedCommitPolicy::Within(_) | AccumulatedCommitPolicy::Immediate(_) => true,
137 }
138 }
139
140 pub(super) fn accumulate(self, commit_policy: InnerCommitPolicy) -> Self {
141 match commit_policy {
142 InnerCommitPolicy::Within(duration) => self.accumulate_commit_within(duration),
143 InnerCommitPolicy::Immediate(waiter) => self.accumulate_commit_immediate(waiter),
144 InnerCommitPolicy::Noop => self,
145 }
146 }
147
148 fn accumulate_commit_within(self, duration: Duration) -> Self {
149 match self {
150 AccumulatedCommitPolicy::Noop => AccumulatedCommitPolicy::Within(duration),
151 AccumulatedCommitPolicy::Within(existing_duration) => {
152 AccumulatedCommitPolicy::Within(std::cmp::min(existing_duration, duration))
154 }
155 AccumulatedCommitPolicy::Immediate(_) => self,
157 }
158 }
159
160 fn accumulate_commit_immediate(self, waiter: ImmediateCommitWaiter) -> Self {
161 match self {
162 AccumulatedCommitPolicy::Noop | AccumulatedCommitPolicy::Within(_) => waiter.into(),
163 AccumulatedCommitPolicy::Immediate(mut waiters) => {
164 waiters.push(waiter);
165 AccumulatedCommitPolicy::Immediate(waiters)
166 }
167 }
168 }
169}