use std::{
error::Error,
fmt::{self, Debug},
sync::{Arc, Weak},
};
use uuid::Uuid;
use super::*;
pub trait Port {
type Indication: Sized + Send + 'static + Clone + Debug;
type Request: Sized + Send + 'static + Clone + Debug;
}
struct CommonPortData<P: Port + 'static> {
provide_channels: Vec<ProvidedRef<P>>,
require_channels: Vec<RequiredRef<P>>,
}
impl<P: Port + 'static> CommonPortData<P> {
fn new() -> CommonPortData<P> {
CommonPortData {
provide_channels: Vec::new(),
require_channels: Vec::new(),
}
}
fn clear_by_id(&mut self, component_id: Uuid) -> bool {
let mut found_match = false;
self.provide_channels.retain(|channel| {
let channel_match = matches!(
channel.owned_by_component_with_id(component_id),
OwnershipResult::Owned | OwnershipResult::Deallocated
);
found_match |= channel_match;
!channel_match
});
self.require_channels.retain(|channel| {
let channel_match = matches!(
channel.owned_by_component_with_id(component_id),
OwnershipResult::Owned | OwnershipResult::Deallocated
);
found_match |= channel_match;
!channel_match
});
found_match
}
fn cleanup(&mut self) {
self.provide_channels.retain(|channel| channel.is_live());
self.require_channels.retain(|channel| channel.is_live());
}
}
pub struct ProvidedPort<P: Port + 'static> {
common: CommonPortData<P>,
parent: Option<Weak<dyn CoreContainer>>,
msg_queue: Arc<ConcurrentQueue<P::Request>>,
}
impl<P: Port + 'static> ProvidedPort<P> {
pub fn uninitialised() -> ProvidedPort<P> {
ProvidedPort {
common: CommonPortData::new(),
parent: None,
msg_queue: Arc::new(ConcurrentQueue::new()),
}
}
pub fn trigger(&mut self, event: P::Indication) -> () {
let mut cleanup_required = false;
self.common
.require_channels
.iter()
.for_each_with(event, |c, e| {
cleanup_required |= c.enqueue(e);
});
if cleanup_required {
self.common.cleanup();
}
}
pub fn connect(&mut self, c: RequiredRef<P>) -> () {
self.common.require_channels.push(c);
}
pub fn disconnect_port(&mut self, c: RequiredRef<P>) -> bool {
let mut found_match = false;
self.common.require_channels.retain(|other| {
let matches = &c == other;
found_match |= matches;
!matches
});
found_match
}
pub fn disconnect_component(&mut self, c: &dyn CoreContainer) -> bool {
self.common.clear_by_id(c.id())
}
pub fn share(&mut self) -> ProvidedRef<P> {
match self.parent {
Some(ref p) => {
let core_container = p.clone();
ProvidedRef {
msg_queue: Arc::downgrade(&self.msg_queue),
component: core_container,
}
}
None => panic!("Port is not properly initialized!"),
}
}
pub fn set_parent(&mut self, p: Arc<dyn CoreContainer>) -> () {
self.parent = Some(Arc::downgrade(&p));
}
pub fn dequeue(&self) -> Option<P::Request> {
self.msg_queue.pop()
}
}
pub struct RequiredPort<P: Port + 'static> {
common: CommonPortData<P>,
parent: Option<Weak<dyn CoreContainer>>,
msg_queue: Arc<ConcurrentQueue<P::Indication>>,
}
impl<P: Port + 'static> RequiredPort<P> {
pub fn uninitialised() -> RequiredPort<P> {
RequiredPort {
common: CommonPortData::new(),
parent: None,
msg_queue: Arc::new(ConcurrentQueue::new()),
}
}
pub fn trigger(&mut self, event: P::Request) -> () {
let mut cleanup_required = false;
self.common
.provide_channels
.iter()
.for_each_with(event, |c, e| {
cleanup_required |= c.enqueue(e);
});
if cleanup_required {
self.common.cleanup();
}
}
pub fn connect(&mut self, c: ProvidedRef<P>) -> () {
self.common.provide_channels.push(c);
}
pub fn disconnect_port(&mut self, c: ProvidedRef<P>) -> bool {
let mut found_match = false;
self.common.provide_channels.retain(|other| {
let matches = &c == other;
found_match |= matches;
!matches
});
found_match
}
pub fn disconnect_component(&mut self, c: &dyn CoreContainer) -> bool {
self.common.clear_by_id(c.id())
}
pub fn share(&mut self) -> RequiredRef<P> {
match self.parent {
Some(ref p) => {
let core_container = p.clone();
RequiredRef {
msg_queue: Arc::downgrade(&self.msg_queue),
component: core_container,
}
}
None => panic!("Port is not properly initialized!"),
}
}
pub fn set_parent(&mut self, p: Arc<dyn CoreContainer>) -> () {
self.parent = Some(Arc::downgrade(&p));
}
pub fn dequeue(&self) -> Option<P::Indication> {
self.msg_queue.pop()
}
}
pub struct ProvidedRef<P: Port + 'static> {
component: Weak<dyn CoreContainer>,
msg_queue: Weak<ConcurrentQueue<P::Request>>,
}
impl<P: Port + 'static> Clone for ProvidedRef<P> {
fn clone(&self) -> ProvidedRef<P> {
ProvidedRef {
component: self.component.clone(),
msg_queue: self.msg_queue.clone(),
}
}
}
impl<P: Port + 'static> PartialEq for ProvidedRef<P> {
fn eq(&self, other: &Self) -> bool {
self.msg_queue.ptr_eq(&other.msg_queue)
}
}
impl<P: Port + 'static> ProvidedRef<P> {
pub(crate) fn enqueue(&self, event: P::Request) -> bool {
match (self.msg_queue.upgrade(), self.component.upgrade()) {
(Some(q), Some(c)) => {
let sd = c.core().increment_work();
q.push(event);
if let SchedulingDecision::Schedule = sd {
let system = c.core().system();
system.schedule(c.clone());
}
false
}
(_q, _c) => {
#[cfg(test)]
println!(
"Dropping event as target (queue? {:?}, component? {:?}) is unavailable: {:?}",
_q.is_some(),
_c.is_some(),
event
);
true
}
}
}
fn is_live(&self) -> bool {
self.component.strong_count() > 0 && self.msg_queue.strong_count() > 0
}
pub fn owned_by_component_with_id(&self, component_id: Uuid) -> OwnershipResult {
if let Some(c) = self.component.upgrade() {
if c.id() == component_id {
OwnershipResult::Owned
} else {
OwnershipResult::NotOwned
}
} else {
OwnershipResult::Deallocated
}
}
}
pub struct RequiredRef<P: Port + 'static> {
component: Weak<dyn CoreContainer>,
msg_queue: Weak<ConcurrentQueue<P::Indication>>,
}
impl<P: Port + 'static> Clone for RequiredRef<P> {
fn clone(&self) -> RequiredRef<P> {
RequiredRef {
component: self.component.clone(),
msg_queue: self.msg_queue.clone(),
}
}
}
impl<P: Port + 'static> PartialEq for RequiredRef<P> {
fn eq(&self, other: &Self) -> bool {
self.msg_queue.ptr_eq(&other.msg_queue)
}
}
impl<P: Port + 'static> RequiredRef<P> {
pub(crate) fn enqueue(&self, event: P::Indication) -> bool {
match (self.msg_queue.upgrade(), self.component.upgrade()) {
(Some(q), Some(c)) => {
let sd = c.core().increment_work();
q.push(event);
if let SchedulingDecision::Schedule = sd {
let system = c.core().system();
system.schedule(c.clone());
}
false
}
(_q, _c) => {
#[cfg(test)]
println!(
"Dropping event as target (queue? {:?}, component? {:?}) is unavailable: {:?}",
_q.is_some(),
_c.is_some(),
event
);
true
}
}
}
fn is_live(&self) -> bool {
self.component.strong_count() > 0 && self.msg_queue.strong_count() > 0
}
pub fn owned_by_component_with_id(&self, component_id: Uuid) -> OwnershipResult {
if let Some(c) = self.component.upgrade() {
if c.id() == component_id {
OwnershipResult::Owned
} else {
OwnershipResult::NotOwned
}
} else {
OwnershipResult::Deallocated
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OwnershipResult {
Owned,
NotOwned,
Deallocated,
}
#[derive(Debug, PartialEq, Eq)]
pub enum TryLockError {
Poisoned,
WouldBlock,
}
impl fmt::Display for TryLockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(
&match *self {
TryLockError::Poisoned => "poisoned lock: another task failed inside",
TryLockError::WouldBlock => "try_lock failed because the operation would block",
},
f,
)
}
}
impl Error for TryLockError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}
}
impl<T> From<std::sync::TryLockError<T>> for TryLockError {
fn from(error: std::sync::TryLockError<T>) -> Self {
match error {
std::sync::TryLockError::Poisoned(_) => TryLockError::Poisoned,
std::sync::TryLockError::WouldBlock => TryLockError::WouldBlock,
}
}
}
pub struct DisconnectError<C> {
pub channel: C,
pub error: TryLockError,
}
impl<C> fmt::Debug for DisconnectError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DisconnectError")
.field("error", &self.error)
.finish()
}
}
impl<C> fmt::Display for DisconnectError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "channel disconnection failed")
}
}
impl<C> Error for DisconnectError<C> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.error)
}
}
pub trait Channel {
fn boxed(self) -> Box<dyn Channel + Send + 'static>
where
Self: Sized;
fn disconnect(self) -> Result<(), DisconnectError<Self>>
where
Self: Sized,
{
let res = self.disconnect_by_ref();
res.map_err(|e| DisconnectError {
channel: self,
error: e,
})
}
fn disconnect_by_ref(&self) -> Result<(), TryLockError>;
}
pub struct TwoWayChannel<P, C1, C2>
where
P: Port + 'static,
C1: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
C2: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
{
provider: Weak<Component<C1>>,
requirer: Weak<Component<C2>>,
provided_ref: ProvidedRef<P>,
required_ref: RequiredRef<P>,
}
impl<P, C1, C2> TwoWayChannel<P, C1, C2>
where
P: Port + 'static,
C1: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
C2: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
{
pub(crate) fn new(
provider: &Arc<Component<C1>>,
requirer: &Arc<Component<C2>>,
provided_ref: ProvidedRef<P>,
required_ref: RequiredRef<P>,
) -> Self {
TwoWayChannel {
provider: Arc::downgrade(provider),
requirer: Arc::downgrade(requirer),
provided_ref,
required_ref,
}
}
}
impl<P, C1, C2> Channel for TwoWayChannel<P, C1, C2>
where
P: Port + 'static,
C1: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
C2: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
{
fn boxed(self) -> std::boxed::Box<(dyn Channel + Send + 'static)> {
Box::new(self)
}
fn disconnect_by_ref(&self) -> Result<(), TryLockError> {
if let Some(component) = self.provider.upgrade() {
let mut core = component.mutable_core.try_lock()?;
ProvideRef::disconnect(&mut core.definition, self.required_ref.clone());
}
if let Some(component) = self.requirer.upgrade() {
let mut core = component.mutable_core.try_lock()?;
RequireRef::disconnect(&mut core.definition, self.provided_ref.clone());
}
Ok(())
}
}
pub struct ProviderChannel<P, C>
where
P: Port + 'static,
C: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
{
provider: Weak<Component<C>>,
required_ref: RequiredRef<P>,
}
impl<P, C> ProviderChannel<P, C>
where
P: Port + 'static,
C: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
{
pub(crate) fn new(provider: &Arc<Component<C>>, required_ref: RequiredRef<P>) -> Self {
ProviderChannel {
provider: Arc::downgrade(provider),
required_ref,
}
}
}
impl<P, C> Channel for ProviderChannel<P, C>
where
P: Port + 'static,
C: ComponentDefinition + Sized + 'static + Provide<P> + ProvideRef<P>,
{
fn boxed(self) -> std::boxed::Box<(dyn Channel + Send + 'static)> {
Box::new(self)
}
fn disconnect_by_ref(&self) -> Result<(), TryLockError> {
if let Some(component) = self.provider.upgrade() {
let mut core = component.mutable_core.try_lock()?;
ProvideRef::disconnect(&mut core.definition, self.required_ref.clone());
}
Ok(())
}
}
pub struct RequirerChannel<P, C>
where
P: Port + 'static,
C: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
{
requirer: Weak<Component<C>>,
provided_ref: ProvidedRef<P>,
}
impl<P, C> RequirerChannel<P, C>
where
P: Port + 'static,
C: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
{
pub(crate) fn new(requirer: &Arc<Component<C>>, provided_ref: ProvidedRef<P>) -> Self {
RequirerChannel {
requirer: Arc::downgrade(requirer),
provided_ref,
}
}
}
impl<P, C> Channel for RequirerChannel<P, C>
where
P: Port + 'static,
C: ComponentDefinition + Sized + 'static + Require<P> + RequireRef<P>,
{
fn boxed(self) -> std::boxed::Box<(dyn Channel + Send + 'static)> {
Box::new(self)
}
fn disconnect_by_ref(&self) -> Result<(), TryLockError> {
if let Some(component) = self.requirer.upgrade() {
let mut core = component.mutable_core.try_lock()?;
RequireRef::disconnect(&mut core.definition, self.provided_ref.clone());
}
Ok(())
}
}
impl Channel for Box<(dyn Channel + Send + 'static)> {
fn boxed(self) -> std::boxed::Box<(dyn Channel + Send + 'static)> {
self
}
fn disconnect_by_ref(&self) -> Result<(), TryLockError> {
Channel::disconnect_by_ref(self.as_ref())
}
}