use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use std::ops::Deref;
use std::time::Duration;
use bytes::Bytes;
use futures::channel::mpsc::Sender;
use futures::sink::SinkExt;
use protobuf::Chars;
use serde::de::Deserialize;
use serde::ser::Serialize;
use serde_json;
use uuid::Uuid;
use crate::internal::command::Cmd;
use crate::internal::messages;
use crate::internal::messaging::Msg;
use crate::internal::package::Pkg;
use futures::Stream;
#[derive(Debug, Clone)]
pub enum OperationError {
WrongExpectedVersion(String, ExpectedVersion),
StreamDeleted(String),
InvalidTransaction,
AccessDenied(String),
ProtobufDecodingError(String),
ServerError(Option<String>),
InvalidOperation(String),
StreamNotFound(String),
AuthenticationRequired,
Aborted,
WrongClientImpl(Option<Cmd>),
ConnectionHasDropped,
NotImplemented,
}
impl std::error::Error for OperationError {}
impl std::fmt::Display for OperationError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
use OperationError::*;
match self {
WrongExpectedVersion(stream, exp) => {
writeln!(f, "expected version {:?} for stream {}", exp, stream)
}
StreamDeleted(stream) => writeln!(f, "stream {} deleted", stream),
InvalidTransaction => writeln!(f, "invalid transaction"),
AccessDenied(info) => writeln!(f, "access denied: {}", info),
ProtobufDecodingError(error) => writeln!(f, "protobuf decoding error: {}", error),
ServerError(error) => writeln!(f, "server error: {:?}", error),
InvalidOperation(info) => writeln!(f, "invalid operation: {}", info),
StreamNotFound(stream) => writeln!(f, "stream {} not found", stream),
AuthenticationRequired => writeln!(f, "authentication required"),
Aborted => writeln!(f, "aborted"),
WrongClientImpl(info) => writeln!(f, "wrong client impl: {:?}", info),
ConnectionHasDropped => writeln!(f, "connection has dropped"),
NotImplemented => writeln!(f, "not implemented"),
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum Retry {
Undefinately,
Only(usize),
}
impl Retry {
pub(crate) fn to_usize(&self) -> usize {
match *self {
Retry::Undefinately => usize::max_value(),
Retry::Only(x) => x,
}
}
}
#[derive(Clone, Debug)]
pub struct Credentials {
pub(crate) login: Bytes,
pub(crate) password: Bytes,
}
impl Credentials {
pub fn new<S>(login: S, password: S) -> Credentials
where
S: Into<Bytes>,
{
Credentials {
login: login.into(),
password: password.into(),
}
}
pub(crate) fn network_size(&self) -> usize {
self.login.len() + self.password.len() + 2
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LinkTos {
ResolveLink,
NoResolution,
}
impl LinkTos {
pub(crate) fn raw_resolve_lnk_tos(self) -> bool {
match self {
LinkTos::ResolveLink => true,
LinkTos::NoResolution => false,
}
}
pub(crate) fn from_bool(raw: bool) -> LinkTos {
if raw {
LinkTos::ResolveLink
} else {
LinkTos::NoResolution
}
}
}
#[derive(Clone, Debug)]
pub struct Settings {
pub heartbeat_delay: Duration,
pub heartbeat_timeout: Duration,
pub operation_timeout: Duration,
pub operation_retry: Retry,
pub connection_retry: Retry,
pub default_user: Option<Credentials>,
pub connection_name: Option<String>,
pub operation_check_period: Duration,
}
impl Default for Settings {
fn default() -> Self {
Settings {
heartbeat_delay: Duration::from_millis(750),
heartbeat_timeout: Duration::from_millis(1500),
operation_timeout: Duration::from_secs(7),
operation_retry: Retry::Only(3),
connection_retry: Retry::Only(3),
default_user: None,
connection_name: None,
operation_check_period: Duration::from_secs(1),
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum ExpectedVersion {
Any,
StreamExists,
NoStream,
Exact(i64),
}
impl ExpectedVersion {
pub(crate) fn to_i64(self) -> i64 {
match self {
ExpectedVersion::Any => -2,
ExpectedVersion::StreamExists => -4,
ExpectedVersion::NoStream => -1,
ExpectedVersion::Exact(n) => n,
}
}
pub(crate) fn from_i64(ver: i64) -> ExpectedVersion {
match ver {
-2 => ExpectedVersion::Any,
-4 => ExpectedVersion::StreamExists,
-1 => ExpectedVersion::NoStream,
_ => ExpectedVersion::Exact(ver),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct Position {
pub commit: i64,
pub prepare: i64,
}
impl Position {
pub fn start() -> Position {
Position {
commit: 0,
prepare: 0,
}
}
pub fn end() -> Position {
Position {
commit: -1,
prepare: -1,
}
}
}
impl PartialOrd for Position {
fn partial_cmp(&self, other: &Position) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Position {
fn cmp(&self, other: &Position) -> Ordering {
self.commit
.cmp(&other.commit)
.then(self.prepare.cmp(&other.prepare))
}
}
#[derive(Debug)]
pub struct WriteResult {
pub next_expected_version: i64,
pub position: Position,
}
#[derive(Debug)]
pub enum ReadEventStatus<A> {
NotFound,
NoStream,
Deleted,
Success(A),
}
#[derive(Debug)]
pub struct ReadEventResult {
pub stream_id: String,
pub event_number: i64,
pub event: ResolvedEvent,
}
#[derive(Debug)]
pub struct RecordedEvent {
pub event_stream_id: String,
pub event_id: Uuid,
pub event_number: i64,
pub event_type: String,
pub data: Bytes,
pub metadata: Bytes,
pub is_json: bool,
pub created: Option<i64>,
pub created_epoch: Option<i64>,
}
fn decode_bytes_error(err: uuid::Error) -> ::std::io::Error {
::std::io::Error::new(::std::io::ErrorKind::Other, format!("BytesError {}", err))
}
impl RecordedEvent {
pub(crate) fn new(mut event: messages::EventRecord) -> ::std::io::Result<RecordedEvent> {
let event_stream_id = event.take_event_stream_id().deref().to_owned();
let event_id = Uuid::from_slice(event.get_event_id()).map_err(decode_bytes_error)?;
let event_number = event.get_event_number();
let event_type = event.take_event_type().deref().to_owned();
let data = event.take_data();
let metadata = event.take_metadata();
let created = {
if event.has_created() {
Some(event.get_created())
} else {
None
}
};
let created_epoch = {
if event.has_created_epoch() {
Some(event.get_created_epoch())
} else {
None
}
};
let is_json = event.get_data_content_type() == 1;
let record = RecordedEvent {
event_stream_id,
event_id,
event_number,
event_type,
data,
metadata,
created,
created_epoch,
is_json,
};
Ok(record)
}
pub fn as_json<'a, T>(&'a self) -> serde_json::Result<T>
where
T: Deserialize<'a>,
{
serde_json::from_slice(&self.data[..])
}
}
#[derive(Debug)]
pub struct ResolvedEvent {
pub event: Option<RecordedEvent>,
pub link: Option<RecordedEvent>,
pub position: Option<Position>,
}
impl ResolvedEvent {
pub(crate) fn new(mut msg: messages::ResolvedEvent) -> ::std::io::Result<ResolvedEvent> {
let event = {
if msg.has_event() {
let record = RecordedEvent::new(msg.take_event())?;
Ok(Some(record))
} else {
Ok::<Option<RecordedEvent>, ::std::io::Error>(None)
}
}?;
let link = {
if msg.has_link() {
let record = RecordedEvent::new(msg.take_link())?;
Ok(Some(record))
} else {
Ok::<Option<RecordedEvent>, ::std::io::Error>(None)
}
}?;
let position = Position {
commit: msg.get_commit_position(),
prepare: msg.get_prepare_position(),
};
let position = Some(position);
let resolved = ResolvedEvent {
event,
link,
position,
};
Ok(resolved)
}
pub(crate) fn new_from_indexed(
mut msg: messages::ResolvedIndexedEvent,
) -> ::std::io::Result<ResolvedEvent> {
let event = {
if msg.has_event() {
let record = RecordedEvent::new(msg.take_event())?;
Ok(Some(record))
} else {
Ok::<Option<RecordedEvent>, ::std::io::Error>(None)
}
}?;
let link = {
if msg.has_link() {
let record = RecordedEvent::new(msg.take_link())?;
Ok(Some(record))
} else {
Ok::<Option<RecordedEvent>, ::std::io::Error>(None)
}
}?;
let position = None;
let resolved = ResolvedEvent {
event,
link,
position,
};
Ok(resolved)
}
pub fn is_resolved(&self) -> bool {
self.event.is_some() && self.link.is_some()
}
pub fn get_original_event(&self) -> &RecordedEvent {
self.link.as_ref().unwrap_or_else(|| {
self.event
.as_ref()
.expect("[get_original_event] Not supposed to happen!")
})
}
pub fn get_original_stream_id(&self) -> &str {
let event = self.get_original_event();
event.event_stream_id.deref()
}
}
#[derive(Debug, Clone)]
pub enum StreamMetadataResult {
Deleted { stream: String },
NotFound { stream: String },
Success(Box<VersionedMetadata>),
}
#[derive(Debug, Clone)]
pub struct VersionedMetadata {
pub stream: String,
pub version: i64,
pub metadata: StreamMetadata,
}
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
pub struct TransactionId(pub i64);
impl TransactionId {
pub(crate) fn new(id: i64) -> TransactionId {
TransactionId(id)
}
}
#[derive(Copy, Clone, Debug)]
pub enum ReadDirection {
Forward,
Backward,
}
#[derive(Debug)]
pub enum LocatedEvents<A> {
EndOfStream,
Events {
events: Vec<ResolvedEvent>,
next: Option<A>,
},
}
impl<A> LocatedEvents<A> {
pub fn is_end_of_stream(&self) -> bool {
match *self {
LocatedEvents::EndOfStream => true,
LocatedEvents::Events { ref next, .. } => next.is_some(),
}
}
}
pub trait Slice {
type Location: Copy;
fn from(&self) -> Self::Location;
fn direction(&self) -> ReadDirection;
fn events(self) -> LocatedEvents<Self::Location>;
}
#[derive(Debug, Clone)]
pub enum ReadStreamError {
NoStream(String),
StreamDeleted(String),
NotModified(String),
Error(String),
AccessDenied(String),
}
#[derive(Debug, Clone)]
pub enum ReadStreamStatus<A> {
Success(A),
Error(ReadStreamError),
}
#[derive(Debug)]
pub struct StreamSlice {
_from: i64,
_direction: ReadDirection,
_events: Vec<ResolvedEvent>,
_next_num_opt: Option<i64>,
}
impl StreamSlice {
pub(crate) fn new(
direction: ReadDirection,
from: i64,
events: Vec<ResolvedEvent>,
next_num_opt: Option<i64>,
) -> StreamSlice {
StreamSlice {
_from: from,
_direction: direction,
_events: events,
_next_num_opt: next_num_opt,
}
}
}
impl Slice for StreamSlice {
type Location = i64;
fn from(&self) -> i64 {
self._from
}
fn direction(&self) -> ReadDirection {
self._direction
}
fn events(self) -> LocatedEvents<i64> {
if self._events.is_empty() {
LocatedEvents::EndOfStream
} else {
match self._next_num_opt {
None => LocatedEvents::Events {
events: self._events,
next: None,
},
Some(next_num) => LocatedEvents::Events {
events: self._events,
next: Some(next_num),
},
}
}
}
}
#[derive(Debug)]
pub struct AllSlice {
from: Position,
direction: ReadDirection,
events: Vec<ResolvedEvent>,
next: Position,
}
impl AllSlice {
pub(crate) fn new(
direction: ReadDirection,
from: Position,
events: Vec<ResolvedEvent>,
next: Position,
) -> AllSlice {
AllSlice {
from,
direction,
events,
next,
}
}
}
impl Slice for AllSlice {
type Location = Position;
fn from(&self) -> Position {
self.from
}
fn direction(&self) -> ReadDirection {
self.direction
}
fn events(self) -> LocatedEvents<Position> {
if self.events.is_empty() {
LocatedEvents::EndOfStream
} else {
LocatedEvents::Events {
events: self.events,
next: Some(self.next),
}
}
}
}
pub enum Payload {
Json(Bytes),
Binary(Bytes),
}
impl Payload {
pub fn is_json(&self) -> bool {
match *self {
Payload::Json(_) => true,
_ => false,
}
}
pub fn into_inner(self) -> Bytes {
match self {
Payload::Json(bytes) => bytes,
Payload::Binary(bytes) => bytes,
}
}
}
pub struct EventData {
event_type: Chars,
payload: Payload,
id_opt: Option<Uuid>,
metadata_payload_opt: Option<Payload>,
enabled_guid: bool,
}
impl EventData {
pub fn json<P, S>(event_type: S, payload: P) -> serde_json::Result<EventData>
where
P: Serialize,
S: AsRef<str>,
{
let data = serde_json::to_vec(&payload)?;
let bytes = Bytes::from(data);
Ok(EventData {
event_type: event_type.as_ref().into(),
payload: Payload::Json(bytes),
id_opt: None,
metadata_payload_opt: None,
enabled_guid: false,
})
}
pub fn binary<S>(event_type: S, payload: Bytes) -> EventData
where
S: AsRef<str>,
{
EventData {
event_type: event_type.as_ref().into(),
payload: Payload::Binary(payload),
id_opt: None,
metadata_payload_opt: None,
enabled_guid: false,
}
}
pub fn id(self, value: Uuid) -> EventData {
EventData {
id_opt: Some(value),
..self
}
}
pub fn metadata_as_json<P>(self, payload: P) -> EventData
where
P: Serialize,
{
let bytes = Bytes::from(serde_json::to_vec(&payload).unwrap());
let json_bin = Some(Payload::Json(bytes));
EventData {
metadata_payload_opt: json_bin,
..self
}
}
pub fn metadata_as_binary(self, payload: Bytes) -> EventData {
let content_bin = Some(Payload::Binary(payload));
EventData {
metadata_payload_opt: content_bin,
..self
}
}
pub fn convert_uuid_to_guid(self) -> EventData {
EventData {
enabled_guid: true,
..self
}
}
pub(crate) fn build(self) -> messages::NewEvent {
let mut new_event = messages::NewEvent::new();
let id = self.id_opt.unwrap_or_else(Uuid::new_v4);
if self.enabled_guid {
new_event.set_event_id(guid::from_uuid(id));
} else {
new_event.set_event_id(id.as_bytes().to_vec().into());
}
match self.payload {
Payload::Json(bin) => {
new_event.set_data_content_type(1);
new_event.set_data(bin);
}
Payload::Binary(bin) => {
new_event.set_data_content_type(0);
new_event.set_data(bin);
}
}
match self.metadata_payload_opt {
Some(Payload::Json(bin)) => {
new_event.set_metadata_content_type(1);
new_event.set_metadata(bin);
}
Some(Payload::Binary(bin)) => {
new_event.set_metadata_content_type(0);
new_event.set_metadata(bin);
}
None => new_event.set_metadata_content_type(0),
}
new_event.set_event_type(self.event_type);
new_event
}
}
#[derive(Default)]
pub struct StreamMetadataBuilder {
max_count: Option<u64>,
max_age: Option<Duration>,
truncate_before: Option<u64>,
cache_control: Option<Duration>,
acl: Option<StreamAcl>,
properties: HashMap<String, serde_json::Value>,
}
impl StreamMetadataBuilder {
pub fn new() -> StreamMetadataBuilder {
Default::default()
}
pub fn max_count(self, value: u64) -> StreamMetadataBuilder {
StreamMetadataBuilder {
max_count: Some(value),
..self
}
}
pub fn max_age(self, value: Duration) -> StreamMetadataBuilder {
StreamMetadataBuilder {
max_age: Some(value),
..self
}
}
pub fn truncate_before(self, value: u64) -> StreamMetadataBuilder {
StreamMetadataBuilder {
truncate_before: Some(value),
..self
}
}
pub fn cache_control(self, value: Duration) -> StreamMetadataBuilder {
StreamMetadataBuilder {
cache_control: Some(value),
..self
}
}
pub fn acl(self, value: StreamAcl) -> StreamMetadataBuilder {
StreamMetadataBuilder {
acl: Some(value),
..self
}
}
pub fn insert_custom_property<V>(mut self, key: String, value: V) -> StreamMetadataBuilder
where
V: Serialize,
{
let serialized = serde_json::to_value(value).unwrap();
let _ = self.properties.insert(key, serialized);
self
}
pub fn build(self) -> StreamMetadata {
StreamMetadata {
max_count: self.max_count,
max_age: self.max_age,
truncate_before: self.truncate_before,
cache_control: self.cache_control,
acl: self.acl.unwrap_or_default(),
custom_properties: self.properties,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct StreamMetadata {
pub max_count: Option<u64>,
pub max_age: Option<Duration>,
pub truncate_before: Option<u64>,
pub cache_control: Option<Duration>,
pub acl: StreamAcl,
pub custom_properties: HashMap<String, serde_json::Value>,
}
impl StreamMetadata {
pub fn builder() -> StreamMetadataBuilder {
StreamMetadataBuilder::new()
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamAcl {
pub read_roles: Option<Vec<String>>,
pub write_roles: Option<Vec<String>>,
pub delete_roles: Option<Vec<String>>,
pub meta_read_roles: Option<Vec<String>>,
pub meta_write_roles: Option<Vec<String>>,
}
pub struct PersistentSubRead {
pub(crate) inner: Box<dyn Stream<Item = PersistentSubEvent> + Send + Unpin>,
}
impl PersistentSubRead {
pub fn into_inner(self) -> Box<dyn Stream<Item = PersistentSubEvent> + Send + Unpin> {
self.inner
}
pub async fn read_next(&mut self) -> Option<PersistentSubEvent> {
use futures::stream::StreamExt;
self.inner.next().await
}
}
pub struct PersistentSubWrite {
pub(crate) sub_id: protobuf::Chars,
pub(crate) sender: Sender<Msg>,
}
impl PersistentSubWrite {
async fn ack<I>(&mut self, ids: I)
where
I: Iterator<Item = Uuid>,
{
let mut msg = messages::PersistentSubscriptionAckEvents::new();
msg.set_processed_event_ids(ids.map(|id| id.as_bytes().to_vec().into()).collect());
msg.set_subscription_id(self.sub_id.clone());
let pkg = Pkg::from_message(Cmd::PersistentSubscriptionAckEvents, None, &msg)
.expect("We expect serializing ack message will succeed");
let _ = self.sender.send(Msg::Send(pkg)).await;
}
async fn nak<I, S>(&mut self, ids: I, action: NakAction, reason: S)
where
I: Iterator<Item = Uuid>,
S: AsRef<str>,
{
let mut msg = messages::PersistentSubscriptionNakEvents::new();
msg.set_processed_event_ids(ids.map(|id| id.as_bytes().to_vec().into()).collect());
msg.set_subscription_id(self.sub_id.clone());
msg.set_message(reason.as_ref().into());
msg.set_action(action.build_internal_nak_action());
let pkg = Pkg::from_message(Cmd::PersistentSubscriptionNakEvents, None, &msg)
.expect("We expect serializing nak message will succeed");
let _ = self.sender.send(Msg::Send(pkg)).await;
}
pub async fn ack_events<I>(&mut self, events: I)
where
I: Iterator<Item = PersistentSubEvent>,
{
self.ack(events.map(|event| event.inner.get_original_event().event_id))
.await
}
pub async fn nak_events<I, S>(&mut self, events: I, action: NakAction, reason: S)
where
I: Iterator<Item = PersistentSubEvent>,
S: AsRef<str>,
{
let ids = events.map(|event| event.inner.get_original_event().event_id);
self.nak(ids, action, reason).await
}
pub async fn ack_event(&mut self, event: PersistentSubEvent) {
self.ack_events(vec![event].into_iter()).await
}
pub async fn nak_event<S>(&mut self, event: PersistentSubEvent, action: NakAction, reason: S)
where
S: AsRef<str>,
{
self.nak_events(vec![event].into_iter(), action, reason)
.await
}
}
pub enum SubEvent {
Confirmed,
EventAppeared {
event: Box<ResolvedEvent>,
retry_count: usize,
},
Dropped,
}
pub(crate) fn keep_subscription_events_only<S>(
stream: S,
) -> impl Stream<Item = ResolvedEvent> + Send + Unpin
where
S: Stream<Item = SubEvent> + Send + Unpin,
{
use futures::stream::StreamExt;
stream.filter_map(|resp| {
let ret = match resp {
SubEvent::Confirmed { .. } => None,
SubEvent::Dropped => None,
SubEvent::EventAppeared { event, .. } => Some(*event),
};
futures::future::ready(ret)
})
}
#[derive(Debug)]
pub struct PersistentSubEvent {
pub inner: ResolvedEvent,
pub retry_count: usize,
}
#[derive(Debug, PartialEq, Eq)]
pub enum NakAction {
Unknown,
Park,
Retry,
Skip,
Stop,
}
impl NakAction {
fn build_internal_nak_action(self) -> messages::PersistentSubscriptionNakEvents_NakAction {
match self {
NakAction::Unknown => messages::PersistentSubscriptionNakEvents_NakAction::Unknown,
NakAction::Retry => messages::PersistentSubscriptionNakEvents_NakAction::Retry,
NakAction::Skip => messages::PersistentSubscriptionNakEvents_NakAction::Skip,
NakAction::Park => messages::PersistentSubscriptionNakEvents_NakAction::Park,
NakAction::Stop => messages::PersistentSubscriptionNakEvents_NakAction::Stop,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum SystemConsumerStrategy {
DispatchToSingle,
RoundRobin,
Pinned,
}
impl SystemConsumerStrategy {
pub(crate) fn as_str(&self) -> &str {
match *self {
SystemConsumerStrategy::DispatchToSingle => "DispatchToSingle",
SystemConsumerStrategy::RoundRobin => "RoundRobin",
SystemConsumerStrategy::Pinned => "Pinned",
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct PersistentSubscriptionSettings {
pub resolve_link_tos: bool,
pub start_from: i64,
pub extra_stats: bool,
pub msg_timeout: Duration,
pub max_retry_count: u16,
pub live_buf_size: u16,
pub read_batch_size: u16,
pub history_buf_size: u16,
pub checkpoint_after: Duration,
pub min_checkpoint_count: u16,
pub max_checkpoint_count: u16,
pub max_subs_count: u16,
pub named_consumer_strategy: SystemConsumerStrategy,
}
impl PersistentSubscriptionSettings {
pub fn default() -> PersistentSubscriptionSettings {
PersistentSubscriptionSettings {
resolve_link_tos: false,
start_from: -1,
extra_stats: false,
msg_timeout: Duration::from_secs(30),
max_retry_count: 10,
live_buf_size: 500,
read_batch_size: 20,
history_buf_size: 500,
checkpoint_after: Duration::from_secs(2),
min_checkpoint_count: 10,
max_checkpoint_count: 1000,
max_subs_count: 0,
named_consumer_strategy: SystemConsumerStrategy::RoundRobin,
}
}
}
impl Default for PersistentSubscriptionSettings {
fn default() -> PersistentSubscriptionSettings {
PersistentSubscriptionSettings::default()
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum PersistActionResult {
Success,
Failure(PersistActionError),
}
impl PersistActionResult {
pub fn is_success(&self) -> bool {
match *self {
PersistActionResult::Success => true,
_ => false,
}
}
pub fn is_failure(&self) -> bool {
!self.is_success()
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum PersistActionError {
Fail,
AlreadyExists,
DoesNotExist,
AccessDenied,
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub struct Endpoint {
pub addr: SocketAddr,
}
impl std::fmt::Display for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.addr)
}
}
impl Endpoint {
pub(crate) fn from_addr(addr: SocketAddr) -> Endpoint {
Endpoint { addr }
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub struct GossipSeed {
pub(crate) endpoint: Endpoint,
}
impl std::fmt::Display for GossipSeed {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "endpoint: {}", self.endpoint.addr)
}
}
impl GossipSeed {
pub fn new<A>(addrs: A) -> std::io::Result<GossipSeed>
where
A: ToSocketAddrs,
{
let mut iter = addrs.to_socket_addrs()?;
if let Some(addr) = iter.next() {
let endpoint = Endpoint { addr };
Ok(GossipSeed::from_endpoint(endpoint))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Failed to resolve socket address.",
))
}
}
pub(crate) fn from_endpoint(endpoint: Endpoint) -> GossipSeed {
GossipSeed { endpoint }
}
pub(crate) fn from_socket_addr(addr: SocketAddr) -> GossipSeed {
GossipSeed::from_endpoint(Endpoint::from_addr(addr))
}
pub(crate) fn url(self) -> std::io::Result<reqwest::Url> {
let url_str = format!("http://{}/gossip?format=json", self.endpoint.addr);
reqwest::Url::parse(&url_str).map_err(|error| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Wrong url [{}]: {}", url_str, error),
)
})
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum NodePreference {
Master,
Slave,
Random,
}
impl std::fmt::Display for NodePreference {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
use self::NodePreference::*;
match self {
Master => write!(f, "Master"),
Slave => write!(f, "Slave"),
Random => write!(f, "Random"),
}
}
}
#[derive(Debug)]
pub struct GossipSeedClusterSettings {
pub(crate) seeds: vec1::Vec1<GossipSeed>,
pub(crate) preference: NodePreference,
pub(crate) gossip_timeout: Duration,
pub(crate) max_discover_attempts: usize,
}
impl GossipSeedClusterSettings {
pub fn new(seeds: vec1::Vec1<GossipSeed>) -> GossipSeedClusterSettings {
GossipSeedClusterSettings {
seeds,
preference: NodePreference::Random,
gossip_timeout: Duration::from_secs(1),
max_discover_attempts: 10,
}
}
pub fn set_gossip_timeout(self, gossip_timeout: Duration) -> GossipSeedClusterSettings {
GossipSeedClusterSettings {
gossip_timeout,
..self
}
}
pub fn set_max_discover_attempts(self, max_attempt: usize) -> GossipSeedClusterSettings {
GossipSeedClusterSettings {
max_discover_attempts: max_attempt,
..self
}
}
}
mod guid {
use bytes::Bytes;
use uuid::Uuid;
pub(crate) fn from_uuid(uuid: Uuid) -> Bytes {
let b = uuid.as_bytes();
Bytes::from(vec![
b[3], b[2], b[1], b[0], b[5], b[4], b[7], b[6], b[8], b[9], b[10], b[11], b[12], b[13],
b[14], b[15],
])
}
fn _to_uuid(b: &[u8]) -> Result<Uuid, uuid::Error> {
Uuid::from_slice(&[
b[3], b[2], b[1], b[0], b[5], b[4], b[7], b[6], b[8], b[9], b[10], b[11], b[12], b[13],
b[14], b[15],
])
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use uuid::Uuid;
#[test]
fn test_from_uuid() {
let uuid = Uuid::from_bytes([
60, 213, 58, 216, 84, 211, 79, 74, 177, 22, 31, 9, 149, 122, 243, 48,
]);
let expected_guid = Bytes::from_static(&[
216, 58, 213, 60, 211, 84, 74, 79, 177, 22, 31, 9, 149, 122, 243, 48,
]);
assert_eq!(super::from_uuid(uuid), expected_guid);
}
#[test]
fn test_to_uuid() {
let guid = &[
216, 58, 213, 60, 211, 84, 74, 79, 177, 22, 31, 9, 149, 122, 243, 48,
];
let expected_uuid = Uuid::from_bytes([
60, 213, 58, 216, 84, 211, 79, 74, 177, 22, 31, 9, 149, 122, 243, 48,
]);
assert_eq!(super::_to_uuid(guid), Ok(expected_uuid));
}
}
}