use heapless::{LinearMap, Vec};
use log::*;
use serde::{Deserialize, Serialize};
use crate::communication::connection::{Address, Connection};
use crate::communication::messages::Package;
use crate::communication::messages::{
IdentificableMessage, SimpleCodifier, MEMBERSHIP_MESSAGE_OPCODE,
};
use crate::communication::{ConnectionError, WriteError};
use crate::membership::metadata::{BasicMetadata, CommunicationMetadata, NodeMetadata};
use crate::membership::MembershipServiceError;
use crate::nodes::SystemNodeId;
use crate::properties::{init_buffer, AddressesBytesVec, MAX_NODE_COUNT};
#[allow(clippy::result_large_err)]
pub type CoordinationPackage<CoordMessage> = Package<SystemNodeId, CoordMessage>;
#[non_exhaustive]
#[allow(clippy::large_enum_variant)]
#[derive(Eq, PartialEq, Serialize, Deserialize, Debug, Clone)]
pub enum MembershipMessage {
KeepAlive(u16),
RegisterReq(u16),
RegisterResp,
DiscoveryReq(u16),
DiscoveryResp(AddressesBytesVec),
}
impl IdentificableMessage for MembershipMessage {
fn id() -> u8 {
MEMBERSHIP_MESSAGE_OPCODE
}
}
pub type MembersAddresses<Addr> = Vec<Addr, MAX_NODE_COUNT>;
pub trait MembershipClient {
type Address: Address;
fn keep_alive(&mut self);
fn subscribe(&mut self) -> Result<(), MembershipServiceError>;
fn find_members(
&mut self,
) -> (
MembersAddresses<Self::Address>,
MembersAddresses<Self::Address>,
);
}
pub struct BaseMembershipClient<
MsgCod: SimpleCodifier,
Conn: Connection,
Addr: Address,
AddrCod: SimpleCodifier,
> {
registered_port: u16,
members_addresses: MembersAddresses<Addr>,
address_codifier: AddrCod,
message_codifier: MsgCod,
connection: Conn,
}
impl<
MsgCod: SimpleCodifier<Data= MembershipMessage>,
Conn: Connection,
Addr: Address,
AddrCod: SimpleCodifier<Data= MembersAddresses<Addr>>,
> BaseMembershipClient<MsgCod, Conn, Addr, AddrCod>
{
pub fn new(
port: u16,
address_codifier: AddrCod,
message_codifier: MsgCod,
connection: Conn,
) -> Self {
Self {
registered_port: port,
members_addresses: Default::default(),
address_codifier,
message_codifier,
connection,
}
}
fn reconnect(&mut self) -> Result<(), ConnectionError> {
self.connection.reconnect()
}
fn send_message(&mut self, message: &MsgCod::Data) -> Result<(), ConnectionError> {
let mut buffer = init_buffer();
let bytes = self
.message_codifier
.encode(message, &mut buffer)
.map_err(|err| ConnectionError::WriteError(WriteError::CodificationError(err)))?;
match self.connection.write(&buffer[..bytes]) {
Ok(bytes) => {
trace!("Wrote {:?} bytes to Membership Service.", bytes);
Ok(())
}
Err(err) => {
error!(
"Connection error while writing to the Membership Service: {:?}",
err
);
Err(err)
}
}
}
fn receive_message(&mut self) -> Option<MsgCod::Data> {
let mut buffer = init_buffer();
if let Ok(bytes) = self.connection.read(&mut buffer) {
if bytes > 0 {
return self.message_codifier.decode(&buffer[0..bytes]).ok();
}
}
None
}
fn update_members(
&mut self,
current_addresses: MembersAddresses<Addr>,
) -> (MembersAddresses<Addr>, MembersAddresses<Addr>) {
let mut removed_addresses = MembersAddresses::<Addr>::new();
for old_address in self.members_addresses.iter() {
if !current_addresses.contains(old_address) {
info!(
"A stored address wasn't received from the Discovery service: {}",
old_address
);
if removed_addresses.push(*old_address).is_err() {
warn!("Couldn't add address {} to remove queue.", old_address);
}
}
}
self.members_addresses
.retain(|addr| current_addresses.contains(addr));
let mut new_addresses = MembersAddresses::<Addr>::new();
for current_address in current_addresses {
if !self.members_addresses.contains(¤t_address)
&& new_addresses.push(current_address).is_err()
{
warn!("Couldn't add address {} to add queue.", current_address);
}
}
if self
.members_addresses
.extend_from_slice(new_addresses.as_slice())
.is_err()
{
error!(
"Max capacity reached, couldn't store {} new IPs.",
new_addresses.len()
);
}
(removed_addresses, new_addresses)
}
}
impl<
MsgCod: SimpleCodifier<Data= MembershipMessage>,
Conn: Connection,
Addr: Address,
AddrCod: SimpleCodifier<Data= MembersAddresses<Addr>>,
> MembershipClient for BaseMembershipClient<MsgCod, Conn, Addr, AddrCod>
{
type Address = Addr;
fn keep_alive(&mut self) {
debug!("Reporting as alive to Membership Service.");
if self.reconnect().is_err() {
warn!(
"Couldn't reconnect to listener. Using previous connection for retrieving members."
)
}
if self
.send_message(&MembershipMessage::KeepAlive(self.registered_port))
.is_err()
{
error!("Couldn't report node liveness to Membership Service.")
}
}
fn subscribe(&mut self) -> Result<(), MembershipServiceError> {
let message = MembershipMessage::RegisterReq(self.registered_port);
match self.reconnect() {
Ok(_) => self
.send_message(&message)
.map_err(MembershipServiceError::ConnectionError),
Err(err) => {
error!(
"Couldn't establish connection to Membership Service: {:?}",
err
);
Err(MembershipServiceError::ConnectionError(err))
}
}
}
fn find_members(
&mut self,
) -> (
MembersAddresses<Self::Address>,
MembersAddresses<Self::Address>,
) {
if self.reconnect().is_err() {
warn!(
"Couldn't reconnect to listener. Using previous connection for retrieving members."
);
}
trace!("Asking for members!");
let message = MembershipMessage::DiscoveryReq(self.registered_port);
if self.send_message(&message).is_ok() {
if let Some(MembershipMessage::DiscoveryResp(addresses)) = self.receive_message() {
match self.address_codifier.decode(addresses.as_slice()) {
Ok(addresses) => {
let (removed_members, new_members) = self.update_members(addresses);
let mut found_members = MembersAddresses::<Self::Address>::new();
if found_members
.extend_from_slice(new_members.as_slice())
.is_err()
{
error!("There where {} new found members from Discovery, exceeding the max capacity.", found_members.len())
}
let mut disconnected_members = MembersAddresses::<Self::Address>::new();
if disconnected_members
.extend_from_slice(removed_members.as_slice())
.is_err()
{
error!("There where {} new disconnected members from Discovery, exceeding the max capacity.", disconnected_members.len())
}
return (disconnected_members, found_members);
}
Err(err) => {
error!("Couldn't deserialize addresses: {:?}", err);
}
}
}
};
warn!("Couldn't retrieve response from MembershipService. Using previous members.");
(
MembersAddresses::<Self::Address>::default(),
self.members_addresses.clone(),
)
}
}
pub trait MembershipManager<const SIZE: usize>: MembershipClient {
type NodeMetadata: BasicMetadata;
fn register_member(&mut self, node_id: SystemNodeId, metadata: Self::NodeMetadata);
fn unregister_member(&mut self, node_id: SystemNodeId);
fn members(&self) -> LinearMap<SystemNodeId, Self::NodeMetadata, SIZE>;
fn select_member(&self, select: fn(&Self::NodeMetadata) -> bool) -> Option<SystemNodeId>;
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct NodeMembershipEntry<CoordMetadata: BasicMetadata, CommMetadata: CommunicationMetadata> {
pub id: SystemNodeId,
pub metadata: NodeMetadata<CoordMetadata, CommMetadata>,
}
#[cfg(test)]
mod test {
use crate::communication::connection::{Connection, Read, Write};
use crate::communication::messages::SimpleCodifier;
use crate::communication::ConnectionError;
use crate::membership::client::{
BaseMembershipClient, MembersAddresses, MembershipClient, MembershipMessage,
};
use crate::CodificationError;
use crate::properties::AddressesBytesVec;
type FakeAddress = u16;
#[derive(Default)]
struct FakeConnection;
impl Read for FakeConnection {
fn read(&mut self, _buf: &mut [u8]) -> Result<usize, ConnectionError> {
Ok(1)
}
}
impl Write for FakeConnection {
fn write(&mut self, _buf: &[u8]) -> Result<usize, ConnectionError> {
Ok(1)
}
}
impl Connection for FakeConnection {
fn reconnect(&mut self) -> Result<(), ConnectionError> {
Ok(())
}
}
struct FakeMessageCodifier {
message: MembershipMessage,
}
impl FakeMessageCodifier {
pub fn with_message(message: MembershipMessage) -> Self {
Self { message }
}
}
impl Default for FakeMessageCodifier {
fn default() -> Self {
Self {
message: MembershipMessage::RegisterResp,
}
}
}
impl SimpleCodifier for FakeMessageCodifier {
type Data = MembershipMessage;
fn decode(&self, _input: &[u8]) -> Result<Self::Data, CodificationError> {
Ok(self.message.clone())
}
fn encode(
&self,
_msg: &Self::Data,
_buffer: &mut [u8],
) -> Result<usize, CodificationError> {
Ok(1)
}
}
#[derive(Default)]
struct FakeAddressCodifier {
addresses: MembersAddresses<FakeAddress>,
}
impl FakeAddressCodifier {
fn with_addresses(addresses: MembersAddresses<FakeAddress>) -> Self {
Self { addresses }
}
}
impl SimpleCodifier for FakeAddressCodifier {
type Data = MembersAddresses<FakeAddress>;
fn decode(&self, _input: &[u8]) -> Result<Self::Data, CodificationError> {
Ok(self.addresses.clone())
}
fn encode(
&self,
_msg: &Self::Data,
_buffer: &mut [u8],
) -> Result<usize, CodificationError> {
Ok(1)
}
}
#[test]
fn membership_manager_can_get_new_members() {
let address: u16 = 1;
let connection = FakeConnection::default();
let mut addresses = MembersAddresses::new();
addresses.push(address).unwrap();
let address_codifier = FakeAddressCodifier::with_addresses(addresses);
let message_codifier = FakeMessageCodifier::with_message(MembershipMessage::DiscoveryResp(AddressesBytesVec::default()));
let mut membership_client =
BaseMembershipClient::new(0, address_codifier, message_codifier, connection);
let (_, new_members) = membership_client.find_members();
assert!(new_members.contains(&address))
}
}