pub mod auth;
pub mod client;
pub mod node_session;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::net::IpAddr;
pub use node_session::NodeSession;
use ractor::Actor;
use ractor::ActorId;
use ractor::ActorProcessingErr;
use ractor::ActorRef;
use ractor::RpcReplyPort;
use ractor::SupervisionEvent;
use crate::net::IncomingEncryptionMode;
use crate::protocol::auth as auth_protocol;
use crate::NodeId;
use crate::RactorMessage;
const PROTOCOL_VERSION: u32 = 1;
#[derive(Debug)]
pub enum SessionCheckReply {
NoOtherConnection,
OtherConnectionContinues,
ThisConnectionContinues,
DuplicateConnection,
}
impl From<SessionCheckReply> for auth_protocol::server_status::Status {
fn from(value: SessionCheckReply) -> Self {
match value {
SessionCheckReply::NoOtherConnection => Self::Ok,
SessionCheckReply::ThisConnectionContinues => Self::OkSimultaneous,
SessionCheckReply::OtherConnectionContinues => Self::NotOk,
SessionCheckReply::DuplicateConnection => Self::Alive,
}
}
}
#[allow(missing_debug_implementations)]
#[derive(RactorMessage)]
pub enum NodeServerMessage {
ConnectionOpened {
stream: Box<crate::net::NetworkStream>,
is_server: bool,
},
ConnectionOpenedExternal {
stream: Box<dyn crate::net::ClusterBidiStream>,
is_server: bool,
},
ConnectionAuthenticated(ActorId),
ConnectionReady(ActorId),
CheckSession {
peer_name: auth_protocol::NameMessage,
reply: RpcReplyPort<SessionCheckReply>,
},
UpdateSession {
actor_id: ActorId,
name: auth_protocol::NameMessage,
},
GetSessions(RpcReplyPort<HashMap<NodeId, NodeServerSessionInformation>>),
SubscribeToEvents {
id: String,
subscription: Box<dyn NodeEventSubscription>,
},
UnsubscribeToEvents(String),
PortChanged {
port: u16,
},
}
#[derive(RactorMessage, Debug)]
pub enum NodeSessionMessage {
MessageReceived(crate::protocol::NetworkMessage),
SendMessage(crate::protocol::node::NodeMessage),
GetAuthenticationState(RpcReplyPort<bool>),
GetReadyState(RpcReplyPort<bool>),
}
#[derive(Copy, Clone, Debug, Default)]
pub enum NodeConnectionMode {
#[default]
Transitive,
Isolated,
}
#[derive(Debug)]
pub struct NodeServer {
port: crate::net::NetworkPort,
cookie: String,
node_name: String,
hostname: String,
encryption_mode: IncomingEncryptionMode,
connection_mode: NodeConnectionMode,
listen_addr: Option<IpAddr>,
}
impl NodeServer {
pub fn new(
port: crate::net::NetworkPort,
cookie: String,
node_name: String,
hostname: String,
encryption_mode: Option<IncomingEncryptionMode>,
connection_mode: Option<NodeConnectionMode>,
) -> Self {
Self {
port,
cookie,
node_name,
hostname,
encryption_mode: encryption_mode.unwrap_or(IncomingEncryptionMode::Raw),
connection_mode: connection_mode.unwrap_or(NodeConnectionMode::Isolated),
listen_addr: None,
}
}
pub fn with_listen_addr(mut self, addr: IpAddr) -> Self {
self.listen_addr = Some(addr);
self
}
}
#[derive(Debug, Clone)]
pub struct NodeServerSessionInformation {
pub actor: ActorRef<NodeSessionMessage>,
pub peer_name: Option<auth_protocol::NameMessage>,
pub is_server: bool,
pub node_id: NodeId,
pub peer_addr: String,
}
impl NodeServerSessionInformation {
fn new(
actor: ActorRef<NodeSessionMessage>,
is_server: bool,
node_id: NodeId,
peer_addr: String,
) -> Self {
Self {
actor,
peer_name: None,
is_server,
node_id,
peer_addr,
}
}
fn update(&mut self, peer_name: auth_protocol::NameMessage) {
self.peer_name = Some(peer_name);
}
}
pub trait NodeEventSubscription: Send + 'static {
fn node_session_opened(&self, ses: NodeServerSessionInformation);
fn node_session_disconnected(&self, ses: NodeServerSessionInformation);
fn node_session_authenicated(&self, ses: NodeServerSessionInformation);
#[allow(unused_variables)]
fn node_session_ready(&self, ses: NodeServerSessionInformation) {}
}
#[allow(missing_debug_implementations)]
pub struct NodeServerState {
listener: ActorRef<crate::net::ListenerMessage>,
node_sessions: HashMap<ActorId, NodeServerSessionInformation>,
node_id_counter: NodeId,
this_node_name: auth_protocol::NameMessage,
subscriptions: HashMap<String, Box<dyn NodeEventSubscription>>,
}
impl NodeServerState {
fn check_peers(&self, new_peer: auth_protocol::NameMessage) -> SessionCheckReply {
for (_key, value) in self.node_sessions.iter() {
if let Some(existing_peer) = &value.peer_name {
if existing_peer.name == new_peer.name {
match (
existing_peer.name.cmp(&self.this_node_name.name),
value.is_server,
) {
(Ordering::Greater, true) | (Ordering::Less, false) => {
value.actor.stop(Some("duplicate_connection".to_string()));
return SessionCheckReply::OtherConnectionContinues;
}
(Ordering::Greater, false) | (Ordering::Less, true) => {
return SessionCheckReply::ThisConnectionContinues;
}
_ => {
return SessionCheckReply::DuplicateConnection;
}
}
}
}
}
SessionCheckReply::NoOtherConnection
}
}
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for NodeServer {
type Msg = NodeServerMessage;
type State = NodeServerState;
type Arguments = ();
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
let listener = crate::net::Listener::new(
self.port,
myself.clone(),
self.encryption_mode.clone(),
self.listen_addr,
);
let (actor_ref, _) =
Actor::spawn_linked(None, listener, myself.clone(), myself.get_cell()).await?;
Ok(Self::State {
node_sessions: HashMap::new(),
listener: actor_ref,
node_id_counter: 0,
this_node_name: auth_protocol::NameMessage {
flags: Some(auth_protocol::NodeFlags {
version: PROTOCOL_VERSION,
}),
name: format!("{}@{}", self.node_name, self.hostname),
connection_string: format!("{}:{}", self.hostname, self.port),
},
subscriptions: HashMap::new(),
})
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::ConnectionOpened { stream, is_server } => {
let node_id = state.node_id_counter;
let peer_addr = stream.peer_addr().to_string();
if let Ok((actor, _)) = Actor::spawn_linked(
None,
NodeSession::new(
node_id,
is_server,
self.cookie.clone(),
myself.clone(),
state.this_node_name.clone(),
self.connection_mode,
),
*stream,
myself.get_cell(),
)
.await
{
let ses = NodeServerSessionInformation::new(
actor.clone(),
is_server,
node_id,
peer_addr,
);
for (_, sub) in state.subscriptions.iter() {
sub.node_session_opened(ses.clone());
}
state.node_sessions.insert(actor.get_id(), ses);
state.node_id_counter += 1;
} else {
tracing::warn!("Failed to startup `NodeSession`, dropping connection");
}
}
Self::Msg::ConnectionOpenedExternal { stream, is_server } => {
let peer_label = stream.peer_label();
let local_label = stream.local_label();
let (reader, writer) = stream.split();
let external_stream = Box::new(crate::net::NetworkStream::External {
peer_label: peer_label.clone(),
local_label,
reader,
writer,
});
let node_id = state.node_id_counter;
let peer_addr = peer_label.unwrap_or_else(|| "external".to_string());
if let Ok((actor, _)) = Actor::spawn_linked(
None,
NodeSession::new(
node_id,
is_server,
self.cookie.clone(),
myself.clone(),
state.this_node_name.clone(),
self.connection_mode,
),
*external_stream,
myself.get_cell(),
)
.await
{
let ses = NodeServerSessionInformation::new(
actor.clone(),
is_server,
node_id,
peer_addr,
);
for (_, sub) in state.subscriptions.iter() {
sub.node_session_opened(ses.clone());
}
state.node_sessions.insert(actor.get_id(), ses);
state.node_id_counter += 1;
} else {
tracing::warn!(
"Failed to startup `NodeSession` for external transport, dropping connection"
);
}
}
Self::Msg::ConnectionAuthenticated(actor_id) => {
if let Some(entry) = state.node_sessions.get(&actor_id) {
for (_, sub) in state.subscriptions.iter() {
sub.node_session_authenicated(entry.clone());
}
}
}
Self::Msg::ConnectionReady(actor_id) => {
if let Some(entry) = state.node_sessions.get(&actor_id) {
for (_, sub) in state.subscriptions.iter() {
sub.node_session_ready(entry.clone());
}
}
}
Self::Msg::UpdateSession { actor_id, name } => {
if let Some(entry) = state.node_sessions.get_mut(&actor_id) {
entry.update(name);
}
}
Self::Msg::CheckSession { peer_name, reply } => {
let _ = reply.send(state.check_peers(peer_name));
}
Self::Msg::GetSessions(reply) => {
let mut map = HashMap::new();
for value in state.node_sessions.values() {
map.insert(value.node_id, value.clone());
}
let _ = reply.send(map);
}
Self::Msg::SubscribeToEvents { id, subscription } => {
state.subscriptions.insert(id, subscription);
}
Self::Msg::UnsubscribeToEvents(id) => {
let _ = state.subscriptions.remove(&id);
}
Self::Msg::PortChanged { port } => {
state.this_node_name.connection_string = format!("{}:{}", self.hostname, port);
}
}
Ok(())
}
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorFailed(actor, msg) => {
if state.listener.get_id() == actor.get_id() {
tracing::error!(
"The Node server's TCP listener failed with '{msg}'. Respawning!"
);
let listener = crate::net::Listener::new(
self.port,
myself.clone(),
self.encryption_mode.clone(),
self.listen_addr,
);
let (actor_ref, _) =
Actor::spawn_linked(None, listener, myself.clone(), myself.get_cell())
.await?;
state.listener = actor_ref;
} else {
match state.node_sessions.entry(actor.get_id()) {
Entry::Occupied(o) => {
tracing::warn!(
"Node session {:?} panicked with '{msg}'",
o.get().peer_name
);
let ses = o.remove();
for (_, sub) in state.subscriptions.iter() {
sub.node_session_disconnected(ses.clone());
}
}
Entry::Vacant(_) => {
tracing::warn!(
"An unknown actor ({:?}) panicked with '{msg}'",
actor.get_id()
);
}
}
}
}
SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => {
if state.listener.get_id() == actor.get_id() {
tracing::error!(
"The Node server's TCP listener exited with '{maybe_reason:?}'. Respawning!"
);
let listener = crate::net::Listener::new(
self.port,
myself.clone(),
self.encryption_mode.clone(),
self.listen_addr,
);
let (actor_ref, _) =
Actor::spawn_linked(None, listener, myself.clone(), myself.get_cell())
.await?;
state.listener = actor_ref;
} else {
match state.node_sessions.entry(actor.get_id()) {
Entry::Occupied(o) => {
tracing::warn!(
"Node session {:?} exited with '{:?}'",
o.get().peer_name,
maybe_reason
);
let ses = o.remove();
for (_, sub) in state.subscriptions.iter() {
sub.node_session_disconnected(ses.clone());
}
}
Entry::Vacant(_) => {
tracing::warn!(
"An unknown actor ({:?}) exited with '{:?}'",
actor.get_id(),
maybe_reason
);
}
}
}
}
_ => {
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[test]
fn test_node_server_creation() {
let node = NodeServer::new(
9090,
"test_cookie".to_string(),
"test_node".to_string(),
"localhost".to_string(),
None,
None,
);
assert_eq!(node.port, 9090);
assert_eq!(node.cookie, "test_cookie");
assert_eq!(node.node_name, "test_node");
assert_eq!(node.hostname, "localhost");
assert!(node.listen_addr.is_none());
}
#[test]
fn test_node_server_with_listen_addr_ipv4() {
let ipv4_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
let node = NodeServer::new(
9090,
"test_cookie".to_string(),
"test_node".to_string(),
"localhost".to_string(),
None,
None,
)
.with_listen_addr(ipv4_addr);
assert_eq!(node.listen_addr, Some(ipv4_addr));
assert_eq!(node.port, 9090); }
#[test]
fn test_node_server_with_listen_addr_ipv6() {
let ipv6_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
let node = NodeServer::new(
9090,
"test_cookie".to_string(),
"test_node".to_string(),
"localhost".to_string(),
None,
None,
)
.with_listen_addr(ipv6_addr);
assert_eq!(node.listen_addr, Some(ipv6_addr));
}
#[test]
fn test_node_server_default_encryption_raw() {
let node = NodeServer::new(
9090,
"test_cookie".to_string(),
"test_node".to_string(),
"localhost".to_string(),
None,
None,
);
match node.encryption_mode {
IncomingEncryptionMode::Raw => {
}
_ => {
panic!("Expected IncomingEncryptionMode::Raw");
}
}
}
#[test]
fn test_node_server_default_connection_mode() {
let node = NodeServer::new(
9090,
"test_cookie".to_string(),
"test_node".to_string(),
"localhost".to_string(),
None,
None,
);
match node.connection_mode {
NodeConnectionMode::Isolated => {
}
_ => {
panic!("Expected NodeConnectionMode::Isolated");
}
}
}
#[test]
fn test_node_server_builder_chaining() {
let ipv4_addr = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
let node = NodeServer::new(
9090,
"test_cookie".to_string(),
"test_node".to_string(),
"localhost".to_string(),
None,
None,
)
.with_listen_addr(ipv4_addr);
assert_eq!(node.listen_addr, Some(ipv4_addr));
assert_eq!(node.port, 9090);
assert_eq!(node.node_name, "test_node");
}
}