use tokio::time::Duration;
use crate::UpstreamError;
use smallvec::SmallVec;
pub(crate) enum InnerCommitPolicy {
Noop,
Immediate(ImmediateCommitWaiter),
Within(Duration),
}
#[derive(Clone, Copy)]
pub enum DefaultCommitPolicy {
Immediate,
Within(Duration),
}
pub(super) enum CommitPolicy {
Default,
Noop,
Immediate,
Within(Duration),
}
pub(super) enum CommitPolicyNoDefault {
Noop,
Immediate,
Within(Duration),
}
impl CommitPolicy {
pub(super) fn apply_default(
self,
default_commit_policy: DefaultCommitPolicy,
) -> CommitPolicyNoDefault {
match self {
CommitPolicy::Default => default_commit_policy.into(),
CommitPolicy::Noop => CommitPolicyNoDefault::Noop,
CommitPolicy::Within(duration) => CommitPolicyNoDefault::Within(duration),
CommitPolicy::Immediate => CommitPolicyNoDefault::Immediate,
}
}
}
pub struct Commit<Data>(CommitPolicy, Data);
impl<Data> Commit<Data> {
pub fn immediately(data: Data) -> Self {
Self(CommitPolicy::Immediate, data)
}
pub fn default(data: Data) -> Self {
Self(CommitPolicy::Default, data)
}
pub fn within(data: Data, duration: Duration) -> Self {
Self(CommitPolicy::Within(duration), data)
}
pub unsafe fn noop(data: Data) -> Self {
Self(CommitPolicy::Noop, data)
}
pub(super) fn into_inner(self) -> (CommitPolicy, Data) {
(self.0, self.1)
}
}
impl From<DefaultCommitPolicy> for CommitPolicyNoDefault {
fn from(default_commit_policy: DefaultCommitPolicy) -> Self {
match default_commit_policy {
DefaultCommitPolicy::Within(duration) => Self::Within(duration),
DefaultCommitPolicy::Immediate => Self::Immediate,
}
}
}
pub(super) type ImmediateCommitWaiter = Box<dyn FnOnce(Result<(), UpstreamError>) + Send + 'static>;
pub(super) type ImmediateCommitWaiters = SmallVec<[ImmediateCommitWaiter; 1]>;
pub(super) enum AccumulatedCommitPolicy {
Noop,
Within(Duration),
Immediate(ImmediateCommitWaiters),
}
impl From<InnerCommitPolicy> for AccumulatedCommitPolicy {
fn from(commit_policy: InnerCommitPolicy) -> Self {
match commit_policy {
InnerCommitPolicy::Within(duration) => AccumulatedCommitPolicy::Within(duration),
InnerCommitPolicy::Immediate(callback) => callback.into(),
InnerCommitPolicy::Noop => AccumulatedCommitPolicy::Noop,
}
}
}
impl From<ImmediateCommitWaiter> for AccumulatedCommitPolicy {
fn from(waiter: ImmediateCommitWaiter) -> Self {
let mut vec = SmallVec::new();
vec.push(waiter);
AccumulatedCommitPolicy::Immediate(vec)
}
}
impl AccumulatedCommitPolicy {
pub(super) fn new() -> Self {
AccumulatedCommitPolicy::Noop
}
pub(super) fn did_mutate(&self) -> bool {
match self {
AccumulatedCommitPolicy::Noop => false,
AccumulatedCommitPolicy::Within(_) | AccumulatedCommitPolicy::Immediate(_) => true,
}
}
pub(super) fn accumulate(self, commit_policy: InnerCommitPolicy) -> Self {
match commit_policy {
InnerCommitPolicy::Within(duration) => self.accumulate_commit_within(duration),
InnerCommitPolicy::Immediate(waiter) => self.accumulate_commit_immediate(waiter),
InnerCommitPolicy::Noop => self,
}
}
fn accumulate_commit_within(self, duration: Duration) -> Self {
match self {
AccumulatedCommitPolicy::Noop => AccumulatedCommitPolicy::Within(duration),
AccumulatedCommitPolicy::Within(existing_duration) => {
AccumulatedCommitPolicy::Within(std::cmp::min(existing_duration, duration))
}
AccumulatedCommitPolicy::Immediate(_) => self,
}
}
fn accumulate_commit_immediate(self, waiter: ImmediateCommitWaiter) -> Self {
match self {
AccumulatedCommitPolicy::Noop | AccumulatedCommitPolicy::Within(_) => waiter.into(),
AccumulatedCommitPolicy::Immediate(mut waiters) => {
waiters.push(waiter);
AccumulatedCommitPolicy::Immediate(waiters)
}
}
}
}