datum-core 0.9.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
//! Typed and type-erased graph ports.
//!
//! [`Inlet`]/[`Outlet`] are the typed handles a stage hands back through its
//! [`Shape`](super::Shape); [`AnyInlet`]/[`AnyOutlet`] are their erased forms
//! (carrying the element [`TypeId`] and a name) used by the builder for runtime
//! wiring validation. A port is just an identity (`PortId` + name) plus a
//! phantom element type — it stores no runtime state; per-materialization port
//! state lives in [`GraphStageLogic`](super::GraphStageLogic).

use super::*;

/// A typed graph inlet: the downstream end of an edge that accepts `T` elements.
///
/// Cheap to clone (an id + an `Arc<str>` name). Call [`erase`](Self::erase) to
/// get the type-erased [`AnyInlet`] the builder validates and wires against.
#[derive(PartialEq, Eq, Hash)]
pub struct Inlet<T: 'static> {
    id: PortId,
    name: Arc<str>,
    _marker: PhantomData<fn() -> T>,
}

impl<T: 'static> Inlet<T> {
    #[must_use]
    pub fn new(name: impl Into<String>) -> Self {
        Self::with_id(next_port_id(), name)
    }

    pub(super) fn with_id(id: PortId, name: impl Into<String>) -> Self {
        Self::with_name(id, Arc::from(name.into()))
    }

    pub(super) fn with_arc_name(id: PortId, name: Arc<str>) -> Self {
        Self::with_name(id, name)
    }

    fn with_name(id: PortId, name: Arc<str>) -> Self {
        Self {
            id,
            name,
            _marker: PhantomData,
        }
    }

    #[must_use]
    pub const fn id(&self) -> PortId {
        self.id
    }

    #[must_use]
    pub fn name(&self) -> &str {
        &self.name
    }

    #[must_use]
    pub fn erase(&self) -> AnyInlet {
        AnyInlet {
            id: self.id,
            name: Arc::clone(&self.name),
            type_id: TypeId::of::<T>(),
            type_name: type_name::<T>(),
        }
    }
}

impl<T: 'static> Clone for Inlet<T> {
    fn clone(&self) -> Self {
        Self {
            id: self.id,
            name: Arc::clone(&self.name),
            _marker: PhantomData,
        }
    }
}

impl<T: 'static> fmt::Debug for Inlet<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Inlet")
            .field("id", &self.id)
            .field("name", &self.name)
            .field("type", &type_name::<T>())
            .finish()
    }
}

/// A typed graph outlet: the upstream end of an edge that emits `T` elements.
///
/// Cheap to clone (an id + an `Arc<str>` name). Call [`erase`](Self::erase) to
/// get the type-erased [`AnyOutlet`] the builder validates and wires against.
#[derive(PartialEq, Eq, Hash)]
pub struct Outlet<T: 'static> {
    id: PortId,
    name: Arc<str>,
    _marker: PhantomData<fn() -> T>,
}

impl<T: 'static> Outlet<T> {
    #[must_use]
    pub fn new(name: impl Into<String>) -> Self {
        Self::with_id(next_port_id(), name)
    }

    pub(super) fn with_id(id: PortId, name: impl Into<String>) -> Self {
        Self::with_name(id, Arc::from(name.into()))
    }

    pub(super) fn with_arc_name(id: PortId, name: Arc<str>) -> Self {
        Self::with_name(id, name)
    }

    fn with_name(id: PortId, name: Arc<str>) -> Self {
        Self {
            id,
            name,
            _marker: PhantomData,
        }
    }

    #[must_use]
    pub const fn id(&self) -> PortId {
        self.id
    }

    #[must_use]
    pub fn name(&self) -> &str {
        &self.name
    }

    #[must_use]
    pub fn erase(&self) -> AnyOutlet {
        AnyOutlet {
            id: self.id,
            name: Arc::clone(&self.name),
            type_id: TypeId::of::<T>(),
            type_name: type_name::<T>(),
        }
    }
}

impl<T: 'static> Clone for Outlet<T> {
    fn clone(&self) -> Self {
        Self {
            id: self.id,
            name: Arc::clone(&self.name),
            _marker: PhantomData,
        }
    }
}

impl<T: 'static> fmt::Debug for Outlet<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Outlet")
            .field("id", &self.id)
            .field("name", &self.name)
            .field("type", &type_name::<T>())
            .finish()
    }
}

/// Type-erased [`Inlet`] carrying the element `TypeId` and name.
///
/// Produced by [`Inlet::erase`]. The builder stores these and compares their
/// `type_id` on `connect`/`wire` to validate that an edge joins ports of the
/// same element type at runtime.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct AnyInlet {
    id: PortId,
    pub(super) name: Arc<str>,
    type_id: TypeId,
    type_name: &'static str,
}

impl AnyInlet {
    #[must_use]
    pub const fn id(&self) -> PortId {
        self.id
    }

    #[must_use]
    pub fn name(&self) -> &str {
        &self.name
    }

    #[must_use]
    pub const fn type_id(&self) -> TypeId {
        self.type_id
    }

    #[must_use]
    pub const fn type_name(&self) -> &'static str {
        self.type_name
    }
}

/// Type-erased [`Outlet`] carrying the element `TypeId` and name.
///
/// Produced by [`Outlet::erase`]. The builder stores these and compares their
/// `type_id` on `connect`/`wire` to validate that an edge joins ports of the
/// same element type at runtime.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct AnyOutlet {
    id: PortId,
    pub(super) name: Arc<str>,
    type_id: TypeId,
    type_name: &'static str,
}

impl AnyOutlet {
    #[must_use]
    pub const fn id(&self) -> PortId {
        self.id
    }

    #[must_use]
    pub fn name(&self) -> &str {
        &self.name
    }

    #[must_use]
    pub const fn type_id(&self) -> TypeId {
        self.type_id
    }

    #[must_use]
    pub const fn type_name(&self) -> &'static str {
        self.type_name
    }
}

/// Common read accessors over a typed port, independent of element type or
/// direction. Implemented by both [`Inlet`] and [`Outlet`]; used by handler
/// helpers (e.g. `set_handler`) that only need a port's id, name, and kind.
pub trait PortRef {
    fn id(&self) -> PortId;
    fn name(&self) -> &str;
    fn kind(&self) -> PortKind;
}

impl<T: 'static> PortRef for Inlet<T> {
    fn id(&self) -> PortId {
        self.id
    }

    fn name(&self) -> &str {
        &self.name
    }

    fn kind(&self) -> PortKind {
        PortKind::Inlet
    }
}

impl<T: 'static> PortRef for Outlet<T> {
    fn id(&self) -> PortId {
        self.id
    }

    fn name(&self) -> &str {
        &self.name
    }

    fn kind(&self) -> PortKind {
        PortKind::Outlet
    }
}