use std::convert::AsRef;
use std::time::Duration;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::model::misc::{AuthorizationAttribute, OwningApplication};
use crate::model::partition::{Cursor, PartitionId};
use crate::Error;
pub mod subscription_builder;
pub use crate::model::event_type::EventTypeName;
new_type! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub copy struct SubscriptionId(Uuid, env="SUBSCRIPTION_ID");
}
impl SubscriptionId {
pub fn random() -> Self {
Self(Uuid::new_v4())
}
}
new_type! {
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub copy struct StreamId(Uuid);
}
impl StreamId {
pub fn random() -> Self {
Self(Uuid::new_v4())
}
}
pub trait EventTypePartitionLike {
fn event_type(&self) -> &EventTypeName;
fn partition(&self) -> &PartitionId;
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub struct EventTypePartition {
pub event_type: EventTypeName,
pub partition: PartitionId,
}
impl EventTypePartition {
pub fn new<E: Into<EventTypeName>, P: Into<PartitionId>>(event_type: E, partition: P) -> Self {
Self {
event_type: event_type.into(),
partition: partition.into(),
}
}
}
impl EventTypePartitionLike for EventTypePartition {
fn event_type(&self) -> &EventTypeName {
&self.event_type
}
fn partition(&self) -> &PartitionId {
&self.partition
}
}
impl From<SubscriptionCursor> for EventTypePartition {
fn from(v: SubscriptionCursor) -> Self {
Self {
event_type: v.event_type,
partition: v.cursor.partition,
}
}
}
impl From<SubscriptionCursorWithoutToken> for EventTypePartition {
fn from(v: SubscriptionCursorWithoutToken) -> Self {
Self {
event_type: v.event_type,
partition: v.cursor.partition,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EventTypeNames(Vec<EventTypeName>);
impl EventTypeNames {
pub fn new<T: Into<Vec<EventTypeName>>>(event_types: T) -> Self {
Self(event_types.into())
}
pub fn into_inner(self) -> Vec<EventTypeName> {
self.0
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn push<T: Into<EventTypeName>>(&mut self, event_type: T) {
self.0.push(event_type.into())
}
}
impl AsRef<[EventTypeName]> for EventTypeNames {
fn as_ref(&self) -> &[EventTypeName] {
&self.0
}
}
new_type! {
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct ConsumerGroup(String, env="CONSUMER_GROUP");
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionEventTypeStatus {
pub event_type: EventTypeName,
pub partitions: Vec<SubscriptionPartitionStatus>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriptionPartitionStatus {
pub partition: PartitionId,
pub state: PartitionState,
pub stream_id: Option<StreamId>,
pub assignment_type: Option<PartitionAssignmentType>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PartitionState {
#[serde(rename = "unassigned")]
Unassigned,
#[serde(rename = "reassigned")]
Reassigned,
#[serde(rename = "assigned")]
Assigned,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PartitionAssignmentType {
#[serde(rename = "direct")]
Direct,
#[serde(rename = "auto")]
Auto,
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SubscriptionAuthorization {
pub admins: Vec<AuthorizationAttribute>,
pub readers: Vec<AuthorizationAttribute>,
}
impl SubscriptionAuthorization {
pub fn add_admin<T: Into<AuthorizationAttribute>>(&mut self, admin: T) {
self.admins.push(admin.into())
}
pub fn add_reader<T: Into<AuthorizationAttribute>>(&mut self, reader: T) {
self.readers.push(reader.into())
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct Subscription {
pub id: SubscriptionId,
pub owning_application: OwningApplication,
pub event_types: EventTypeNames,
#[serde(skip_serializing_if = "Option::is_none")]
pub consumer_group: Option<ConsumerGroup>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize)]
pub struct SubscriptionInput {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<SubscriptionId>,
pub owning_application: OwningApplication,
pub event_types: EventTypeNames,
#[serde(skip_serializing_if = "Option::is_none")]
pub consumer_group: Option<ConsumerGroup>,
pub read_from: ReadFrom,
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_cursors: Option<Vec<SubscriptionCursorWithoutToken>>,
pub authorization: SubscriptionAuthorization,
}
impl SubscriptionInput {
pub fn builder() -> subscription_builder::SubscriptionInputBuilder {
subscription_builder::SubscriptionInputBuilder::default()
}
}
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReadFrom {
Start,
End,
Cursors,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubscriptionCursorWithoutToken {
#[serde(flatten)]
pub cursor: Cursor,
pub event_type: EventTypeName,
}
impl EventTypePartitionLike for SubscriptionCursorWithoutToken {
fn event_type(&self) -> &EventTypeName {
&self.event_type
}
fn partition(&self) -> &PartitionId {
&self.cursor.partition
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CursorToken(String);
impl CursorToken {
pub fn new<T: Into<String>>(token: T) -> Self {
Self(token.into())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubscriptionCursor {
#[serde(flatten)]
pub cursor: Cursor,
pub event_type: EventTypeName,
pub cursor_token: CursorToken,
}
impl EventTypePartitionLike for SubscriptionCursor {
fn event_type(&self) -> &EventTypeName {
&self.event_type
}
fn partition(&self) -> &PartitionId {
&self.cursor.partition
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct CursorCommitResults {
pub commit_results: Vec<CursorCommitResult>,
}
impl CursorCommitResults {
pub fn all_committed(&self) -> bool {
self.commit_results
.iter()
.all(|r| r.result == CommitResult::Committed)
}
pub fn iter_committed_cursors(&self) -> impl Iterator<Item = &SubscriptionCursor> {
self.commit_results
.iter()
.filter(|r| r.is_committed())
.map(|r| &r.cursor)
}
pub fn into_iter_committed_cursors(self) -> impl Iterator<Item = SubscriptionCursor> {
self.commit_results
.into_iter()
.filter(|r| r.is_committed())
.map(|r| r.cursor)
}
pub fn iter_outdated_cursors(&self) -> impl Iterator<Item = &SubscriptionCursor> {
self.commit_results
.iter()
.filter(|r| r.is_outdated())
.map(|r| &r.cursor)
}
pub fn into_iter_outdated_cursors(self) -> impl Iterator<Item = SubscriptionCursor> {
self.commit_results
.into_iter()
.filter(|r| r.is_outdated())
.map(|r| r.cursor)
}
pub fn into_inner(self) -> Vec<CursorCommitResult> {
self.commit_results
}
}
impl From<CursorCommitResults> for Vec<CursorCommitResult> {
fn from(v: CursorCommitResults) -> Self {
v.into_inner()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CursorCommitResult {
pub cursor: SubscriptionCursor,
pub result: CommitResult,
}
impl CursorCommitResult {
pub fn is_committed(&self) -> bool {
self.result == CommitResult::Committed
}
pub fn is_outdated(&self) -> bool {
self.result == CommitResult::Outdated
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum CommitResult {
Committed,
Outdated,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionEventTypeStats {
pub event_type: EventTypeName,
#[serde(default)]
pub partitions: Vec<SubscriptionEventTypePartitionStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionEventTypePartitionStats {
partition: PartitionId,
unconsumed_events: u64,
#[serde(deserialize_with = "crate::deserialize_empty_string_is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
stream_id: Option<StreamId>,
#[serde(skip_serializing_if = "Option::is_none")]
consumer_lag_seconds: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
assignment_type: Option<String>,
state: SubscriptionPartitionState,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SubscriptionPartitionState {
Assigned,
Unassigned,
Reassigning,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SubscriptionPartitionAssignmentType {
Direct,
Auto,
}
#[cfg(test)]
mod test {
use super::*;
use crate::model::partition::CursorOffset;
use serde_json::{self, json};
#[test]
fn subscription_cursor_without_token() {
let json = json!({
"event_type": "the event",
"partition": "the partition",
"offset": "12345",
});
let sample = SubscriptionCursorWithoutToken {
event_type: EventTypeName::new("the event"),
cursor: Cursor {
partition: PartitionId::new("the partition"),
offset: CursorOffset::new("12345"),
},
};
assert_eq!(
serde_json::to_value(sample.clone()).unwrap(),
json,
"serialize"
);
assert_eq!(
serde_json::from_value::<SubscriptionCursorWithoutToken>(json).unwrap(),
sample,
"deserialize"
);
}
#[test]
fn subscription_cursor() {
let json = json!({
"event_type": "the event",
"partition": "the partition",
"offset": "12345",
"cursor_token": "abcdef",
});
let sample = SubscriptionCursor {
event_type: EventTypeName::new("the event"),
cursor_token: CursorToken::new("abcdef"),
cursor: Cursor {
partition: PartitionId::new("the partition"),
offset: CursorOffset::new("12345"),
},
};
assert_eq!(
serde_json::to_value(sample.clone()).unwrap(),
json,
"serialize"
);
assert_eq!(
serde_json::from_value::<SubscriptionCursor>(json).unwrap(),
sample,
"deserialize"
);
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StreamParameters {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub partitions: Vec<EventTypePartition>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_uncommitted_events: Option<MaxUncommittedEvents>,
#[serde(skip_serializing_if = "Option::is_none")]
pub batch_limit: Option<BatchLimit>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_limit: Option<StreamLimit>,
#[serde(skip_serializing_if = "Option::is_none")]
pub batch_flush_timeout: Option<BatchFlushTimeoutSecs>,
#[serde(skip_serializing_if = "Option::is_none")]
pub batch_timespan: Option<BatchTimespanSecs>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_timeout: Option<StreamTimeoutSecs>,
#[serde(skip_serializing_if = "Option::is_none")]
pub commit_timeout: Option<CommitTimeoutSecs>,
}
impl StreamParameters {
pub fn from_env() -> Result<Self, Error> {
let mut me = Self::default();
me.fill_from_env()?;
Ok(me)
}
pub fn from_env_prefixed<T: AsRef<str>>(prefix: T) -> Result<Self, Error> {
let mut me = Self::default();
me.fill_from_env_prefixed(prefix)?;
Ok(me)
}
pub fn fill_from_env(&mut self) -> Result<(), Error> {
self.fill_from_env_prefixed(crate::helpers::NAKADION_PREFIX)
}
pub fn fill_from_env_prefixed<T: AsRef<str>>(&mut self, prefix: T) -> Result<(), Error> {
if self.max_uncommitted_events.is_none() {
self.max_uncommitted_events =
MaxUncommittedEvents::try_from_env_prefixed(prefix.as_ref())?;
}
if self.batch_limit.is_none() {
self.batch_limit = BatchLimit::try_from_env_prefixed(prefix.as_ref())?;
}
if self.stream_limit.is_none() {
self.stream_limit = StreamLimit::try_from_env_prefixed(prefix.as_ref())?;
}
if self.batch_flush_timeout.is_none() {
self.batch_flush_timeout =
BatchFlushTimeoutSecs::try_from_env_prefixed(prefix.as_ref())?;
}
if self.batch_timespan.is_none() {
self.batch_timespan = BatchTimespanSecs::try_from_env_prefixed(prefix.as_ref())?;
}
if self.stream_timeout.is_none() {
self.stream_timeout = StreamTimeoutSecs::try_from_env_prefixed(prefix.as_ref())?;
}
if self.commit_timeout.is_none() {
self.commit_timeout = CommitTimeoutSecs::try_from_env_prefixed(prefix.as_ref())?;
}
Ok(())
}
pub fn partitions(mut self, partitions: Vec<EventTypePartition>) -> Self {
self.partitions = partitions;
self
}
pub fn max_uncommitted_events<T: Into<MaxUncommittedEvents>>(mut self, value: T) -> Self {
self.max_uncommitted_events = Some(value.into());
self
}
pub fn batch_limit<T: Into<BatchLimit>>(mut self, value: T) -> Self {
self.batch_limit = Some(value.into());
self
}
pub fn stream_limit<T: Into<StreamLimit>>(mut self, value: T) -> Self {
self.stream_limit = Some(value.into());
self
}
pub fn batch_flush_timeout<T: Into<BatchFlushTimeoutSecs>>(mut self, value: T) -> Self {
self.batch_flush_timeout = Some(value.into());
self
}
pub fn batch_timespan<T: Into<BatchTimespanSecs>>(mut self, value: T) -> Self {
self.batch_timespan = Some(value.into());
self
}
pub fn stream_timeout<T: Into<StreamTimeoutSecs>>(mut self, value: T) -> Self {
self.stream_timeout = Some(value.into());
self
}
pub fn commit_timeout<T: Into<CommitTimeoutSecs>>(mut self, value: T) -> Self {
self.commit_timeout = Some(value.into());
self
}
pub fn effective_commit_timeout_secs(&self) -> u32 {
self.commit_timeout.map(|s| s.into_inner()).unwrap_or(60)
}
pub fn effective_max_uncommitted_events(&self) -> u32 {
self.max_uncommitted_events
.map(|s| s.into_inner())
.unwrap_or(10)
}
pub fn effective_batch_limit(&self) -> u32 {
self.batch_limit.map(|s| s.into_inner()).unwrap_or(1)
}
}
new_type! {
#[doc="The maximum number of uncommitted events that Nakadi will stream before pausing the stream.\n"]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub copy struct MaxUncommittedEvents(u32, env="MAX_UNCOMMITTED_EVENTS");
}
new_type! {
#[doc="Maximum number of Events in each chunk (and therefore per partition) of the stream.\n"]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub copy struct BatchLimit(u32, env="BATCH_LIMIT");
}
new_type! {
#[doc="Maximum number of Events in this stream \
(over all partitions being streamed in this connection).\n"]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub copy struct StreamLimit(u32, env="STREAM_LIMIT");
}
new_type! {
#[doc="Maximum time in seconds to wait for the flushing of each chunk (per partition).\n"]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub copy struct BatchFlushTimeoutSecs(u32, env="BATCH_FLUSH_TIMEOUT_SECS");
}
impl BatchFlushTimeoutSecs {
pub fn into_duration(self) -> Duration {
Duration::from_secs(u64::from(self.0))
}
}
impl From<BatchFlushTimeoutSecs> for Duration {
fn from(v: BatchFlushTimeoutSecs) -> Self {
v.into_duration()
}
}
new_type! {
#[doc="Useful for batching events based on their received_at timestamp.\n"]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub copy struct BatchTimespanSecs(u32, env="BATCH_TIMESPAN_SECS");
}
impl BatchTimespanSecs {
pub fn into_duration(self) -> Duration {
Duration::from_secs(u64::from(self.0))
}
}
impl From<BatchTimespanSecs> for Duration {
fn from(v: BatchTimespanSecs) -> Self {
v.into_duration()
}
}
new_type! {
#[doc="Maximum time in seconds to wait for the flushing of each chunk (per partition).\n"]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub copy struct StreamTimeoutSecs(u32, env="STREAM_TIMEOUT_SECS");
}
impl StreamTimeoutSecs {
pub fn into_duration(self) -> Duration {
Duration::from_secs(u64::from(self.0))
}
}
impl From<StreamTimeoutSecs> for Duration {
fn from(v: StreamTimeoutSecs) -> Self {
v.into_duration()
}
}
new_type! {
#[doc="Maximum amount of seconds that Nakadi will be waiting for commit after sending a batch to a client.\n"]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub copy struct CommitTimeoutSecs(u32, env="COMMIT_TIMEOUT_SECS");
}
impl CommitTimeoutSecs {
pub fn into_duration(self) -> Duration {
Duration::from_secs(u64::from(self.0))
}
}
impl From<CommitTimeoutSecs> for Duration {
fn from(v: CommitTimeoutSecs) -> Self {
v.into_duration()
}
}