use std::{
fmt::Debug,
io::{Error, ErrorKind},
time::{Duration, SystemTime},
};
use crate::{
DriveOutcome, Flush, Publish, PublishOutcome, Receive, ReceiveOutcome, Session, SessionStatus,
};
pub struct HeartbeatingSession<S> {
session: S,
interval: Duration,
heartbeat_writer: Box<dyn FnMut(&mut S) -> Result<HeartbeatOutcome, Error> + Send + 'static>,
next_heartbeat: SystemTime,
}
impl<S> HeartbeatingSession<S>
where
S: Publish + 'static,
{
pub fn new<F>(session: S, interval: Duration, heartbeat_writer: F) -> Self
where
F: for<'a> FnMut(&mut S) -> Result<HeartbeatOutcome, Error> + Send + 'static,
{
Self {
session,
interval,
heartbeat_writer: Box::new(heartbeat_writer),
next_heartbeat: SystemTime::now() + interval,
}
}
pub fn session<'a>(&'a mut self) -> &'a S {
&self.session
}
pub fn session_mut<'a>(&'a mut self) -> &'a mut S {
&mut self.session
}
}
impl<S> Session for HeartbeatingSession<S>
where
S: Publish + 'static,
{
fn status(&self) -> SessionStatus {
self.session.status()
}
fn drive(&mut self) -> Result<DriveOutcome, std::io::Error> {
let now = SystemTime::now();
if now >= self.next_heartbeat {
if let HeartbeatOutcome::Sent | HeartbeatOutcome::Skipped =
(self.heartbeat_writer)(&mut self.session)?
{
self.next_heartbeat = now + self.interval;
self.session.drive()?;
return Ok(DriveOutcome::Active);
}
}
self.session.drive()
}
}
impl<S> Publish for HeartbeatingSession<S>
where
S: Publish + 'static,
{
type PublishPayload<'a> = S::PublishPayload<'a>;
fn publish<'a>(
&mut self,
payload: Self::PublishPayload<'a>,
) -> Result<crate::PublishOutcome<Self::PublishPayload<'a>>, std::io::Error> {
self.session.publish(payload)
}
}
impl<S> Receive for HeartbeatingSession<S>
where
S: Receive + Publish + 'static,
{
type ReceivePayload<'a> = S::ReceivePayload<'a>;
fn receive<'a>(
&'a mut self,
) -> Result<crate::ReceiveOutcome<Self::ReceivePayload<'a>>, std::io::Error> {
self.session.receive()
}
}
impl<S> Flush for HeartbeatingSession<S>
where
S: Receive + Flush + 'static,
{
fn flush(&mut self) -> Result<(), std::io::Error> {
self.session.flush()
}
}
impl<S> Debug for HeartbeatingSession<S>
where
S: Publish + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HeartbeatingSession")
.field("session", &self.session)
.finish()
}
}
#[derive(Debug, Clone, Copy)]
pub enum HeartbeatOutcome {
Sent,
Skipped,
Failed,
}
pub struct LivenessSession<S> {
session: S,
timeout: Duration,
strategy: LivenessStrategy,
liveness: SystemTime,
}
impl<S> LivenessSession<S>
where
S: Receive + 'static,
{
pub fn new(session: S, timeout: Duration, strategy: LivenessStrategy) -> Self {
Self {
session,
timeout,
strategy,
liveness: SystemTime::now() + timeout,
}
}
pub fn session<'a>(&'a mut self) -> &'a S {
&self.session
}
pub fn session_mut<'a>(&'a mut self) -> &'a mut S {
&mut self.session
}
}
impl<S> Session for LivenessSession<S>
where
S: Receive + 'static,
{
fn status(&self) -> crate::SessionStatus {
self.session.status()
}
fn drive(&mut self) -> Result<DriveOutcome, std::io::Error> {
match self.session.drive()? {
DriveOutcome::Active => {
if self.strategy.drive {
self.liveness = SystemTime::now() + self.timeout;
}
Ok(DriveOutcome::Active)
}
DriveOutcome::Idle => {
if SystemTime::now() > self.liveness {
Err(Error::new(ErrorKind::TimedOut, "liveness check"))
} else {
Ok(DriveOutcome::Idle)
}
}
}
}
}
impl<S> Receive for LivenessSession<S>
where
S: Receive + 'static,
{
type ReceivePayload<'a> = S::ReceivePayload<'a>;
fn receive<'a>(
&'a mut self,
) -> Result<crate::ReceiveOutcome<Self::ReceivePayload<'a>>, std::io::Error> {
let r = self.session.receive()?;
if let ReceiveOutcome::Payload(_) | ReceiveOutcome::Active = r {
if self.strategy.receive {
self.liveness = SystemTime::now() + self.timeout;
}
}
Ok(r)
}
}
impl<S> Publish for LivenessSession<S>
where
S: Publish + Receive + 'static,
{
type PublishPayload<'a> = S::PublishPayload<'a>;
fn publish<'a>(
&mut self,
payload: Self::PublishPayload<'a>,
) -> Result<crate::PublishOutcome<Self::PublishPayload<'a>>, std::io::Error> {
let r = self.session.publish(payload)?;
if let PublishOutcome::Published = r {
if self.strategy.publish {
self.liveness = SystemTime::now() + self.timeout;
}
}
Ok(r)
}
}
impl<S> Flush for LivenessSession<S>
where
S: Flush + Receive + 'static,
{
fn flush(&mut self) -> Result<(), std::io::Error> {
self.session.flush()
}
}
impl<S> Debug for LivenessSession<S>
where
S: Receive + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LivenessSession")
.field("session", &self.session)
.finish()
}
}
#[derive(Debug, Clone, Copy)]
pub struct LivenessStrategy {
drive: bool,
receive: bool,
publish: bool,
}
impl LivenessStrategy {
pub fn new() -> Self {
Self {
drive: false,
receive: false,
publish: false,
}
}
pub fn is_drive(&mut self) -> bool {
self.drive
}
pub fn is_receive(&mut self) -> bool {
self.receive
}
pub fn is_publish(&mut self) -> bool {
self.publish
}
pub fn set_drive(&mut self, drive: bool) {
self.drive = drive;
}
pub fn set_receive(&mut self, receive: bool) {
self.receive = receive;
}
pub fn set_publish(&mut self, publish: bool) {
self.publish = publish;
}
pub fn with_drive(mut self, drive: bool) -> Self {
self.drive = drive;
self
}
pub fn with_receive(mut self, receive: bool) -> Self {
self.receive = receive;
self
}
pub fn with_publish(mut self, publish: bool) -> Self {
self.publish = publish;
self
}
}
impl Default for LivenessStrategy {
fn default() -> Self {
Self {
drive: true,
receive: true,
publish: true,
}
}
}