use std::fmt;
use std::str::FromStr;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use crate::Error;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[serde(rename_all = "snake_case")]
#[serde(tag = "strategy")]
pub enum DispatchStrategy {
AllSequential,
}
impl DispatchStrategy {
env_funs!("DISPATCH_STRATEGY");
}
impl Default for DispatchStrategy {
fn default() -> Self {
DispatchStrategy::AllSequential
}
}
impl fmt::Display for DispatchStrategy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DispatchStrategy::AllSequential => write!(f, "all_sequential")?,
}
Ok(())
}
}
impl FromStr for DispatchStrategy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.trim();
if s.starts_with('{') {
return Ok(serde_json::from_str(s)?);
}
match s {
"all_sequential" => Ok(DispatchStrategy::AllSequential),
_ => Err(Error::new(format!("not a valid dispatch strategy: {}", s))),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "strategy")]
pub enum CommitStrategy {
Immediately,
LatestPossible,
After {
#[serde(skip_serializing_if = "Option::is_none")]
seconds: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
cursors: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
events: Option<u32>,
},
}
impl CommitStrategy {
env_funs!("COMMIT_STRATEGY");
pub fn validate(&self) -> Result<(), Error> {
match self {
CommitStrategy::After {
seconds: None,
cursors: None,
events: None,
} => Err(Error::new(
"'CommitStrategy::After' with all fields set to `None` is not valid",
)),
CommitStrategy::After {
seconds,
cursors,
events,
} => {
if let Some(seconds) = seconds {
if *seconds == 0 {
return Err(Error::new("'CommitStrategy::After::seconds' must not be 0"));
}
} else if let Some(cursors) = cursors {
if *cursors == 0 {
return Err(Error::new("'CommitStrategy::After::cursors' must not be 0"));
}
} else if let Some(events) = events {
if *events == 0 {
return Err(Error::new("'CommitStrategy::After::events' must not be 0"));
}
}
Ok(())
}
_ => Ok(()),
}
}
}
impl FromStr for CommitStrategy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn parse_after(s: &str) -> Result<CommitStrategy, Error> {
let parts: Vec<_> = s.split(' ').filter(|s| !s.is_empty()).collect();
let mut seconds: Option<u32> = None;
let mut cursors: Option<u32> = None;
let mut events: Option<u32> = None;
if parts.is_empty() {
return Err(Error::new("invalid"));
}
if parts[0] != "after" {
return Err(Error::new("must start with 'after'"));
}
for p in parts.into_iter().skip(1) {
let parts: Vec<_> = p.split(':').collect();
if parts.len() != 2 {
return Err(Error::new(format!("not valid: {}", p)));
}
let v: u32 = parts[1]
.parse()
.map_err(|err| Error::new(format!("{} not an u32: {}", parts[0], err)))?;
match parts[0] {
"seconds" => seconds = Some(v),
"cursors" => cursors = Some(v),
"events" => events = Some(v),
_ => {
return Err(Error::new(format!(
"not a part of CommitStrategy: {}",
parts[0]
)))
}
}
}
Ok(CommitStrategy::After {
seconds,
events,
cursors,
})
}
let s = s.trim();
if s.starts_with('{') {
return Ok(serde_json::from_str(s)?);
}
let strategy = match s {
"immediately" => CommitStrategy::Immediately,
"latest_possible" => CommitStrategy::LatestPossible,
_ => parse_after(s).map_err(|err| {
Error::new(format!(
"could not parse CommitStrategy from {}: {}",
s, err
))
})?,
};
strategy.validate()?;
Ok(strategy)
}
}
impl fmt::Display for CommitStrategy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CommitStrategy::Immediately => write!(f, "immediately")?,
CommitStrategy::LatestPossible => write!(f, "latest_possible")?,
CommitStrategy::After {
seconds: None,
cursors: None,
events: None,
} => {
write!(f, "after")?;
}
CommitStrategy::After {
seconds,
cursors,
events,
} => {
write!(f, "after ")?;
if let Some(seconds) = seconds {
write!(f, "seconds:{}", seconds)?;
if cursors.is_some() || events.is_some() {
write!(f, " ")?;
}
}
if let Some(cursors) = cursors {
write!(f, "cursors:{}", cursors)?;
if events.is_some() {
write!(f, " ")?;
}
}
if let Some(events) = events {
write!(f, "events:{}", events)?;
}
}
}
Ok(())
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "policy")]
#[non_exhaustive]
pub enum StreamDeadPolicy {
Never,
NoFramesFor { seconds: u32 },
NoEventsFor { seconds: u32 },
}
impl StreamDeadPolicy {
env_funs!("STREAM_DEAD_POLICY");
pub(crate) fn is_stream_dead(
self,
last_frame_received_at: Instant,
last_events_received_at: Instant,
) -> bool {
match self {
StreamDeadPolicy::Never => false,
StreamDeadPolicy::NoFramesFor { seconds } => {
last_frame_received_at.elapsed() > Duration::from_secs(u64::from(seconds))
}
StreamDeadPolicy::NoEventsFor { seconds } => {
last_events_received_at.elapsed() > Duration::from_secs(u64::from(seconds))
}
}
}
pub fn validate(self) -> Result<(), Error> {
match self {
StreamDeadPolicy::NoFramesFor { seconds: 0 } => Err(Error::new(
"StreamDeadPolicy::NoFramesFor::seconds may not be 0",
)),
StreamDeadPolicy::NoEventsFor { seconds: 0 } => Err(Error::new(
"StreamDeadPolicy::NoFramesFor::seconds may not be 0",
)),
_ => Ok(()),
}
}
}
impl fmt::Display for StreamDeadPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StreamDeadPolicy::Never => write!(f, "never")?,
StreamDeadPolicy::NoFramesFor { seconds } => {
write!(f, "no_frames_for_seconds {}", seconds)?
}
StreamDeadPolicy::NoEventsFor { seconds } => {
write!(f, "no_events_for_seconds {}", seconds)?
}
}
Ok(())
}
}
impl Default for StreamDeadPolicy {
fn default() -> Self {
StreamDeadPolicy::Never
}
}
impl FromStr for StreamDeadPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn parse_internal(s: &str) -> Result<StreamDeadPolicy, Error> {
let parts: Vec<_> = s.split(' ').filter(|s| !s.is_empty()).collect();
if parts.is_empty() {
return Err(Error::new("invalid"));
}
if parts[0] == "no_frames_for_seconds" {
return parse_no_frames_for(parts);
}
if parts[0] == "no_events_for_seconds" {
return parse_no_events_for(parts);
}
Err(Error::new("not a StreamDeadPolicy"))
}
let s = s.trim();
if s.starts_with('{') {
return Ok(serde_json::from_str(s)?);
}
let strategy = match s {
"never" => StreamDeadPolicy::Never,
_ => parse_internal(s).map_err(|err| {
Error::new(format!(
"could not parse CommitStrategy from {}: {}",
s, err
))
})?,
};
Ok(strategy)
}
}
fn parse_no_frames_for(parts: Vec<&str>) -> Result<StreamDeadPolicy, Error> {
if parts[0] != "no_frames_for_seconds" {
return Err(Error::new("not StreamDeadPolicy::NoFramesFor"));
}
if parts.len() == 2 {
let seconds: u32 = parts[1]
.parse()
.map_err(|err| Error::new(format!("{} not an u32: {}", parts[0], err)))?;
Ok(StreamDeadPolicy::NoFramesFor { seconds })
} else {
Err(Error::new("not StreamDeadPolicy::NoFramesFor"))
}
}
fn parse_no_events_for(parts: Vec<&str>) -> Result<StreamDeadPolicy, Error> {
if parts[0] != "no_events_for_seconds" {
return Err(Error::new("not StreamDeadPolicy::NoEventsFor"));
}
if parts.len() == 2 {
let seconds: u32 = parts[1]
.parse()
.map_err(|err| Error::new(format!("{} not an u32: {}", parts[0], err)))?;
Ok(StreamDeadPolicy::NoEventsFor { seconds })
} else {
Err(Error::new("not StreamDeadPolicy::NoEventsFor"))
}
}