use super::*;
use crate::{
messaging::{DispatchData, DispatchEnvelope, MsgEnvelope, SerialisedFrame},
net::buffers::ChunkRef,
};
use std::{
convert::TryFrom,
error::Error,
fmt::{self, Debug},
net::{AddrParseError, IpAddr, SocketAddr},
ops::Div,
str::FromStr,
};
use uuid::Uuid;
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(u8)]
pub enum Transport {
Local = 0b00,
Tcp = 0b01,
Udp = 0b10,
}
impl Transport {
pub fn is_local(&self) -> bool {
matches!(*self, Transport::Local)
}
pub fn is_remote(&self) -> bool {
!self.is_local()
}
}
impl fmt::Display for Transport {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
&Transport::Local => write!(fmt, "local"),
&Transport::Tcp => write!(fmt, "tcp"),
&Transport::Udp => write!(fmt, "udp"),
}
}
}
impl FromStr for Transport {
type Err = TransportParseError;
fn from_str(s: &str) -> Result<Transport, TransportParseError> {
match s {
"local" => Ok(Transport::Local),
"tcp" => Ok(Transport::Tcp),
"udp" => Ok(Transport::Udp),
_ => Err(TransportParseError),
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub struct TransportParseError;
impl fmt::Display for TransportParseError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.write_str("TransportParseError")
}
}
impl Error for TransportParseError {
fn description(&self) -> &str {
"Transport must be one of [local,tcp,udp]"
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PathParseError {
Form(String),
Transport(TransportParseError),
Addr(AddrParseError),
IllegalCharacter(char),
}
impl fmt::Display for PathParseError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PathParseError::Form(s) => write!(fmt, "Invalid formatting: {}", s),
PathParseError::Transport(e) => write!(fmt, "Could not parse transport: {}", e),
PathParseError::Addr(e) => write!(fmt, "Could not parse address: {}", e),
PathParseError::IllegalCharacter(c) => {
write!(fmt, "The path contains an illegal character: {}", c)
}
}
}
}
impl Error for PathParseError {
fn description(&self) -> &str {
"Path could not be parsed"
}
fn cause(&self) -> Option<&dyn Error> {
match self {
PathParseError::Form(_) => None,
PathParseError::Transport(e) => Some(e),
PathParseError::Addr(e) => Some(e),
PathParseError::IllegalCharacter(_) => None,
}
}
}
impl From<TransportParseError> for PathParseError {
fn from(e: TransportParseError) -> PathParseError {
PathParseError::Transport(e)
}
}
impl From<AddrParseError> for PathParseError {
fn from(e: AddrParseError) -> PathParseError {
PathParseError::Addr(e)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct SystemPath {
protocol: Transport,
address: IpAddr,
port: u16,
}
impl SystemPath {
pub fn new(protocol: Transport, address: IpAddr, port: u16) -> SystemPath {
SystemPath {
protocol,
address,
port,
}
}
pub fn with_socket(protocol: Transport, socket: SocketAddr) -> SystemPath {
SystemPath {
protocol,
address: socket.ip(),
port: socket.port(),
}
}
pub fn protocol(&self) -> Transport {
self.protocol
}
pub fn address(&self) -> &IpAddr {
&self.address
}
pub fn port(&self) -> u16 {
self.port
}
pub fn into_named_with_string(self, path: &str) -> Result<NamedPath, PathParseError> {
let parsed = parse_path(path);
self.into_named_with_vec(parsed)
}
pub fn socket_address(&self) -> SocketAddr {
SocketAddr::new(self.address, self.port)
}
pub fn into_named_with_vec(self, path: Vec<String>) -> Result<NamedPath, PathParseError> {
validate_lookup_path(&path)?;
let named = NamedPath::with_system(self, path);
Ok(named)
}
pub fn into_unique(self, id: Uuid) -> UniquePath {
UniquePath::with_system(self, id)
}
}
impl fmt::Display for SystemPath {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}://{}:{}", self.protocol, self.address, self.port)
}
}
pub trait SystemField {
fn system(&self) -> &SystemPath;
fn protocol(&self) -> Transport {
self.system().protocol()
}
fn address(&self) -> &IpAddr {
self.system().address()
}
fn port(&self) -> u16 {
self.system().port()
}
}
impl SystemField for SystemPath {
fn system(&self) -> &SystemPath {
self
}
}
pub trait ActorPathFactory {
fn actor_path(&self) -> ActorPath;
}
pub struct DispatchingPath<'a, 'b> {
path: &'a ActorPath,
ctx: &'b dyn Dispatching,
}
impl Dispatching for DispatchingPath<'_, '_> {
fn dispatcher_ref(&self) -> DispatcherRef {
self.ctx.dispatcher_ref()
}
}
impl ActorPathFactory for DispatchingPath<'_, '_> {
fn actor_path(&self) -> ActorPath {
self.path.clone()
}
}
#[derive(Clone, Debug)]
#[repr(u8)]
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ActorPath {
Unique(UniquePath),
Named(NamedPath),
}
impl ActorPath {
pub fn tell<S, B>(&self, m: B, from: &S) -> ()
where
S: ActorPathFactory + Dispatching,
B: Into<Box<dyn Serialisable>>,
{
let mut src = from.actor_path();
src.set_protocol(self.protocol());
self.tell_with_sender(m, from, src)
}
pub fn tell_with_sender<B, D>(&self, m: B, dispatch: &D, from: ActorPath) -> ()
where
B: Into<Box<dyn Serialisable>>,
D: Dispatching,
{
let msg: Box<dyn Serialisable> = m.into();
let dst = self.clone();
let env = DispatchEnvelope::Msg {
src: from.clone(),
dst: dst.clone(),
msg: DispatchData::Lazy(msg, from, dst),
};
dispatch.dispatcher_ref().enqueue(MsgEnvelope::Typed(env))
}
pub fn tell_serialised<CD, B>(&self, m: B, from: &CD) -> Result<(), SerError>
where
CD: ComponentTraits + ComponentLifecycle,
B: Serialisable + 'static,
{
let mut src = from.actor_path();
src.set_protocol(self.protocol());
self.tell_serialised_with_sender(m, from, src)
}
pub fn tell_serialised_with_sender<CD, B>(
&self,
m: B,
dispatch: &CD,
from: ActorPath,
) -> Result<(), SerError>
where
CD: ComponentTraits + ComponentLifecycle,
B: Serialisable + 'static,
{
if self.protocol() == Transport::Local {
self.tell_with_sender(m, dispatch, from);
Ok(())
} else {
dispatch.ctx().with_buffer(|buffer| {
let mut buf = buffer.get_buffer_encoder()?;
let msg =
crate::serialisation::ser_helpers::serialise_msg(&from, self, &m, &mut buf)?;
let env = DispatchEnvelope::Msg {
src: from,
dst: self.clone(),
msg: DispatchData::Serialised(SerialisedFrame::ChunkLease(msg)),
};
dispatch.dispatcher_ref().enqueue(MsgEnvelope::Typed(env));
Ok(())
})
}
}
pub fn tell_preserialised<CD>(&self, content: ChunkRef, from: &CD) -> Result<(), SerError>
where
CD: ComponentTraits + ComponentLifecycle,
{
let mut src = from.actor_path();
src.set_protocol(self.protocol());
self.tell_preserialised_with_sender(content, from, src)
}
pub fn tell_preserialised_with_sender<CD: ComponentTraits + ComponentLifecycle>(
&self,
content: ChunkRef,
dispatch: &CD,
from: ActorPath,
) -> Result<(), SerError> {
dispatch.ctx().with_buffer(|buffer| {
let mut buf = buffer.get_buffer_encoder()?;
let msg = crate::serialisation::ser_helpers::serialise_msg_with_preserialised(
&from, self, content, &mut buf,
)?;
let env = DispatchEnvelope::Msg {
src: from,
dst: self.clone(),
msg: DispatchData::Serialised(SerialisedFrame::ChunkRef(msg)),
};
dispatch.dispatcher_ref().enqueue(MsgEnvelope::Typed(env));
Ok(())
})
}
pub fn forward_with_original_sender<D>(
&self,
mut serialised_message: NetMessage,
dispatcher: &D,
) -> ()
where
D: Dispatching,
{
serialised_message.receiver = self.clone();
let env = DispatchEnvelope::ForwardedMsg {
msg: serialised_message,
};
dispatcher.dispatcher_ref().enqueue(MsgEnvelope::Typed(env));
}
pub fn forward_with_sender<D>(
&self,
mut serialised_message: NetMessage,
dispatch: &D,
from: ActorPath,
) -> ()
where
D: Dispatching,
{
serialised_message.receiver = self.clone();
serialised_message.sender = from;
let env = DispatchEnvelope::ForwardedMsg {
msg: serialised_message,
};
dispatch.dispatcher_ref().enqueue(MsgEnvelope::Typed(env));
}
pub fn using_dispatcher<'a, 'b>(
&'a self,
disp: &'b dyn Dispatching,
) -> DispatchingPath<'a, 'b> {
DispatchingPath {
path: self,
ctx: disp,
}
}
fn system_mut(&mut self) -> &mut SystemPath {
match self {
ActorPath::Unique(ref mut up) => up.system_mut(),
ActorPath::Named(ref mut np) => np.system_mut(),
}
}
pub fn set_protocol(&mut self, proto: Transport) {
self.system_mut().protocol = proto;
}
pub fn via_udp(&mut self) {
self.set_protocol(Transport::Udp);
}
pub fn via_tcp(&mut self) {
self.set_protocol(Transport::Tcp);
}
pub fn via_local(&mut self) {
self.set_protocol(Transport::Local);
}
pub fn named(self) -> Option<NamedPath> {
match self {
ActorPath::Named(p) => Some(p),
_ => None,
}
}
pub fn unwrap_named(self) -> NamedPath {
self.named().unwrap()
}
pub fn unique(self) -> Option<UniquePath> {
match self {
ActorPath::Unique(p) => Some(p),
_ => None,
}
}
pub fn unwrap_unique(self) -> UniquePath {
self.unique().unwrap()
}
}
impl SystemField for ActorPath {
fn system(&self) -> &SystemPath {
match self {
ActorPath::Unique(up) => up.system(),
ActorPath::Named(np) => np.system(),
}
}
}
impl From<(SystemPath, Uuid)> for ActorPath {
fn from(t: (SystemPath, Uuid)) -> ActorPath {
ActorPath::Unique(UniquePath {
system: t.0,
id: t.1,
})
}
}
impl From<UniquePath> for ActorPath {
fn from(p: UniquePath) -> ActorPath {
ActorPath::Unique(p)
}
}
impl From<NamedPath> for ActorPath {
fn from(p: NamedPath) -> ActorPath {
ActorPath::Named(p)
}
}
pub const PATH_SEP: char = '/';
pub const UNIQUE_PATH_SEP: char = '#';
pub const BROADCAST_MARKER: char = '*';
pub const SELECT_MARKER: char = '?';
impl fmt::Display for ActorPath {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActorPath::Named(np) => {
let path = np.path.iter().fold(String::new(), |mut acc, arg| {
acc.push(PATH_SEP);
acc.push_str(arg);
acc
});
write!(fmt, "{}{}", np.system, path)
}
ActorPath::Unique(up) => write!(fmt, "{}{}{}", up.system, UNIQUE_PATH_SEP, up.id),
}
}
}
impl TryFrom<String> for ActorPath {
type Error = PathParseError;
fn try_from(s: String) -> Result<Self, Self::Error> {
let p = ActorPath::from_str(&s)?;
Ok(p)
}
}
impl FromStr for ActorPath {
type Err = PathParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains(UNIQUE_PATH_SEP) {
let p = UniquePath::from_str(s)?;
Ok(ActorPath::Unique(p))
} else {
let p = NamedPath::from_str(s)?;
Ok(ActorPath::Named(p))
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct UniquePath {
system: SystemPath,
id: Uuid,
}
impl UniquePath {
pub fn new(protocol: Transport, address: IpAddr, port: u16, id: Uuid) -> UniquePath {
UniquePath {
system: SystemPath::new(protocol, address, port),
id,
}
}
pub fn with_system(system: SystemPath, id: Uuid) -> UniquePath {
UniquePath { system, id }
}
pub fn with_socket(protocol: Transport, socket: SocketAddr, id: Uuid) -> UniquePath {
UniquePath {
system: SystemPath::with_socket(protocol, socket),
id,
}
}
pub fn id(&self) -> Uuid {
self.id
}
pub fn system_mut(&mut self) -> &mut SystemPath {
&mut self.system
}
}
impl TryFrom<String> for UniquePath {
type Error = PathParseError;
fn try_from(s: String) -> Result<Self, Self::Error> {
let p = UniquePath::from_str(&s)?;
Ok(p)
}
}
impl FromStr for UniquePath {
type Err = PathParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<&str> = s.split("://").collect();
if parts.len() != 2 {
return Err(PathParseError::Form(s.to_string()));
}
let proto: Transport = parts[0].parse()?;
let parts: Vec<&str> = parts[1].split(UNIQUE_PATH_SEP).collect();
if parts.len() != 2 {
return Err(PathParseError::Form(s.to_string()));
}
let socket = SocketAddr::from_str(parts[0])?;
let uuid =
Uuid::from_str(parts[1]).map_err(|_parse_err| PathParseError::Form(s.to_string()))?;
Ok(UniquePath::with_socket(proto, socket, uuid))
}
}
impl SystemField for UniquePath {
fn system(&self) -> &SystemPath {
&self.system
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct NamedPath {
system: SystemPath,
path: Vec<String>,
}
impl NamedPath {
pub fn new(protocol: Transport, address: IpAddr, port: u16, path: Vec<String>) -> NamedPath {
debug_assert!(
crate::actors::validate_lookup_path(&path).is_ok(),
"Path contains illegal characters: {:?}",
path
);
NamedPath {
system: SystemPath::new(protocol, address, port),
path,
}
}
pub fn with_socket(protocol: Transport, socket: SocketAddr, path: Vec<String>) -> NamedPath {
debug_assert!(
crate::actors::validate_lookup_path(&path).is_ok(),
"Path contains illegal characters: {:?}",
path
);
NamedPath {
system: SystemPath::with_socket(protocol, socket),
path,
}
}
pub fn with_system(system: SystemPath, path: Vec<String>) -> NamedPath {
debug_assert!(
crate::actors::validate_lookup_path(&path).is_ok(),
"Path contains illegal characters: {:?}",
path
);
NamedPath { system, path }
}
pub fn path_ref(&self) -> &[String] {
&self.path
}
pub fn clone_path(&self) -> Vec<String> {
self.path.clone()
}
pub fn into_path(self) -> Vec<String> {
self.path
}
pub fn system_mut(&mut self) -> &mut SystemPath {
&mut self.system
}
pub fn push(&mut self, segment: String) -> Result<(), PathParseError> {
if let Some(last_segment) = self.path.last() {
validate_lookup_path_segment(last_segment, false)?;
}
validate_lookup_path_segment(&segment, true)?;
self.path.push(segment);
Ok(())
}
pub fn append(mut self, path: &str) -> Result<Self, PathParseError> {
if let Some(last_segment) = self.path.last() {
validate_lookup_path_segment(last_segment, false)?;
}
let mut segments = parse_path(path);
self.path.append(&mut segments);
validate_lookup_path(&self.path)?;
Ok(self)
}
}
impl SystemField for NamedPath {
fn system(&self) -> &SystemPath {
&self.system
}
}
impl TryFrom<String> for NamedPath {
type Error = PathParseError;
fn try_from(s: String) -> Result<Self, Self::Error> {
NamedPath::from_str(&s)
}
}
impl FromStr for NamedPath {
type Err = PathParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s1: Vec<&str> = s.split("://").collect();
if s1.len() != 2 {
return Err(PathParseError::Form(s.to_string()));
}
let proto: Transport = s1[0].parse()?;
let mut s2: Vec<&str> = s1[1].split(PATH_SEP).collect();
if s2.is_empty() {
return Err(PathParseError::Form(s.to_string()));
}
let socket = SocketAddr::from_str(s2[0])?;
let path: Vec<String> = if s2.len() > 1 {
s2.split_off(1).into_iter().map(|v| v.to_string()).collect()
} else {
Vec::default()
};
validate_lookup_path(&path)?;
Ok(NamedPath::with_socket(proto, socket, path))
}
}
impl Div<&str> for NamedPath {
type Output = Self;
fn div(self, rhs: &str) -> Self::Output {
self.append(rhs).expect("illegal path")
}
}
impl Div<char> for NamedPath {
type Output = Self;
fn div(mut self, rhs: char) -> Self::Output {
self.push(rhs.to_string()).expect("illegal path");
self
}
}
pub fn parse_path(s: &str) -> Vec<String> {
s.split(PATH_SEP).map(|v| v.to_string()).collect()
}
pub fn validate_lookup_path(path: &[String]) -> Result<(), PathParseError> {
let len = path.len();
for (index, segment) in path.iter().enumerate() {
validate_lookup_path_segment(segment, (index + 1) == len)?;
}
Ok(())
}
pub fn validate_insert_path(path: &[String]) -> Result<(), PathParseError> {
for segment in path.iter() {
validate_insert_path_segment(segment)?;
}
Ok(())
}
pub(crate) fn validate_lookup_path_segment(
segment: &str,
is_last: bool,
) -> Result<(), PathParseError> {
if segment.is_empty() {
return Err(PathParseError::Form(
"Path segments may not be empty!".to_string(),
));
}
let len = segment.len();
for c in segment.chars() {
match c {
PATH_SEP => return Err(PathParseError::IllegalCharacter(PATH_SEP)),
BROADCAST_MARKER if !is_last && len == 1 => {
return Err(PathParseError::IllegalCharacter(BROADCAST_MARKER))
}
SELECT_MARKER if !is_last && len == 1 => {
return Err(PathParseError::IllegalCharacter(SELECT_MARKER))
}
UNIQUE_PATH_SEP => return Err(PathParseError::IllegalCharacter(UNIQUE_PATH_SEP)),
_ => (), }
}
Ok(())
}
pub(crate) fn validate_insert_path_segment(segment: &str) -> Result<(), PathParseError> {
if segment.is_empty() {
return Err(PathParseError::Form(
"Path segments may not be empty!".to_string(),
));
}
for c in segment.chars() {
match c {
PATH_SEP => return Err(PathParseError::IllegalCharacter(PATH_SEP)),
BROADCAST_MARKER => return Err(PathParseError::IllegalCharacter(BROADCAST_MARKER)),
SELECT_MARKER => return Err(PathParseError::IllegalCharacter(SELECT_MARKER)),
UNIQUE_PATH_SEP => return Err(PathParseError::IllegalCharacter(UNIQUE_PATH_SEP)),
_ => (), }
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
const PATH: &str = "local://127.0.0.1:0/test_actor";
#[test]
fn actor_path_strings() {
let ap = ActorPath::from_str(PATH).expect("a proper path");
println!("Got ActorPath={}", ap);
let s = ap.to_string();
assert_eq!(PATH, &s);
let ap2: ActorPath = s.parse().expect("a proper path");
assert_eq!(ap, ap2);
}
#[test]
fn actor_path_unique_strings() {
let ref1 = ActorPath::Unique(UniquePath::new(
Transport::Local,
"127.0.0.1".parse().expect("hardcoded IP"),
8080,
Uuid::new_v4(),
));
let ref1_string = ref1.to_string();
let ref1_deser = ActorPath::from_str(&ref1_string).expect("a proper path");
let ref1_deser2: ActorPath = ref1_string.parse().expect("a proper path");
assert_eq!(ref1, ref1_deser);
assert_eq!(ref1, ref1_deser2);
}
#[test]
fn actor_path_named_strings() {
let ref1 = ActorPath::Named(NamedPath::new(
Transport::Local,
"127.0.0.1".parse().expect("hardcoded IP"),
8080,
vec!["test".to_string(), "path".to_string()],
));
let ref1_string = ref1.to_string();
let ref1_deser = ActorPath::from_str(&ref1_string).expect("a proper path");
let ref1_deser2: ActorPath = ref1_string.parse().expect("a proper path");
assert_eq!(ref1, ref1_deser);
assert_eq!(ref1, ref1_deser2);
}
}