use std::collections::HashMap;
use crate::BrokerId;
pub const INITIAL_EPOCH: i32 = 0;
#[allow(dead_code)]
pub(crate) const FINAL_EPOCH: i32 = -1;
#[derive(Debug)]
pub struct ShareSessionState {
epoch: i32,
established: bool,
}
impl ShareSessionState {
fn new() -> Self {
Self {
epoch: INITIAL_EPOCH,
established: false,
}
}
pub fn epoch(&self) -> i32 {
self.epoch
}
pub fn on_success(&mut self) {
self.established = true;
self.epoch = if self.epoch == i32::MAX {
1
} else {
self.epoch + 1
};
}
pub fn reset(&mut self) {
self.epoch = INITIAL_EPOCH;
self.established = false;
}
}
#[derive(Debug, Default)]
pub struct ShareSessionCache {
sessions: HashMap<BrokerId, ShareSessionState>,
}
impl ShareSessionCache {
pub fn new() -> Self {
Self::default()
}
pub fn get_or_create(&mut self, broker_id: BrokerId) -> &mut ShareSessionState {
self.sessions
.entry(broker_id)
.or_insert_with(ShareSessionState::new)
}
pub fn get(&self, broker_id: BrokerId) -> Option<&ShareSessionState> {
self.sessions.get(&broker_id)
}
pub fn reset_broker(&mut self, broker_id: BrokerId) {
if let Some(session) = self.sessions.get_mut(&broker_id) {
session.reset();
}
}
pub fn reset_all(&mut self) {
for session in self.sessions.values_mut() {
session.reset();
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn new_session_starts_at_epoch_zero() {
let state = ShareSessionState::new();
assert_eq!(state.epoch(), INITIAL_EPOCH);
assert!(!state.established);
}
#[test]
fn on_success_bumps_epoch() {
let mut state = ShareSessionState::new();
state.on_success();
assert_eq!(state.epoch(), 1);
assert!(state.established);
state.on_success();
assert_eq!(state.epoch(), 2);
}
#[test]
fn epoch_wraps_at_max() {
let mut state = ShareSessionState::new();
state.epoch = i32::MAX;
state.on_success();
assert_eq!(state.epoch(), 1);
}
#[test]
fn reset_returns_to_initial() {
let mut state = ShareSessionState::new();
state.on_success();
state.on_success();
assert_eq!(state.epoch(), 2);
assert!(state.established);
state.reset();
assert_eq!(state.epoch(), INITIAL_EPOCH);
assert!(!state.established);
}
#[test]
fn final_epoch_is_minus_one() {
assert_eq!(FINAL_EPOCH, -1);
}
#[test]
fn cache_get_or_create_returns_same_session() {
let mut cache = ShareSessionCache::new();
cache.get_or_create(1).on_success();
assert_eq!(cache.get_or_create(1).epoch(), 1);
}
#[test]
fn cache_reset_broker() {
let mut cache = ShareSessionCache::new();
cache.get_or_create(1).on_success();
cache.get_or_create(1).on_success();
assert_eq!(cache.get(1).unwrap().epoch(), 2);
cache.reset_broker(1);
assert_eq!(cache.get(1).unwrap().epoch(), INITIAL_EPOCH);
}
#[test]
fn cache_reset_all() {
let mut cache = ShareSessionCache::new();
cache.get_or_create(1).on_success();
cache.get_or_create(2).on_success();
cache.get_or_create(2).on_success();
cache.reset_all();
assert_eq!(cache.get(1).unwrap().epoch(), INITIAL_EPOCH);
assert_eq!(cache.get(2).unwrap().epoch(), INITIAL_EPOCH);
}
}