use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
use crate::errors::NodeError;
use crate::errors::QueueError;
use crate::memory::PlacementAcceptance;
use crate::message::{payload::Payload, Message};
use crate::node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult};
use crate::policy::{BatchingPolicy, EdgePolicy, NodePolicy, WatermarkState};
use crate::prelude::{
BatchView, EdgeDescriptor, HeaderStore, MemoryManager, PlatformClock, Telemetry,
};
use crate::types::{EdgeIndex, MessageToken, NodeIndex, PortId};
use core::marker::PhantomData;
pub const EXTERNAL_INGRESS_NODE: NodeIndex = NodeIndex::new(usize::MAX);
pub trait Source<OutP, const OUT: usize>
where
OutP: Payload,
{
type Error;
fn open(&mut self) -> Result<(), Self::Error>;
fn try_produce(&mut self) -> Option<(usize, Message<OutP>)>;
fn ingress_occupancy(&self) -> EdgeOccupancy;
fn peek_ingress_creation_tick(&self, item_index: usize) -> Option<u64>;
fn output_acceptance(&self) -> [PlacementAcceptance; OUT];
fn capabilities(&self) -> NodeCapabilities;
#[inline]
fn into_sourcenode(self, policy: NodePolicy) -> SourceNode<Self, OutP, OUT>
where
Self: Sized,
{
SourceNode::new(self, policy)
}
fn policy(&self) -> NodePolicy;
fn ingress_policy(&self) -> EdgePolicy;
}
pub struct SourceNode<S, OutP, const OUT: usize>
where
S: Source<OutP, OUT>,
OutP: Payload,
{
src: S,
policy: NodePolicy,
_pd: PhantomData<OutP>,
}
impl<S, OutP, const OUT: usize> From<S> for SourceNode<S, OutP, OUT>
where
S: Source<OutP, OUT>,
OutP: Payload,
{
#[inline]
fn from(src: S) -> Self {
let policy = src.policy();
SourceNode::new(src, policy)
}
}
impl<S, OutP, const OUT: usize> SourceNode<S, OutP, OUT>
where
S: Source<OutP, OUT>,
OutP: Payload,
{
#[inline]
pub const fn new(src: S, policy: NodePolicy) -> Self {
Self {
src,
policy,
_pd: PhantomData,
}
}
#[inline]
pub fn source_ref(&self) -> &S {
&self.src
}
#[inline]
pub fn source_mut(&mut self) -> &mut S {
&mut self.src
}
#[inline]
pub fn ingress_edge_has_batch(&self) -> bool {
let ingress_occ = self.source_ref().ingress_occupancy();
if *ingress_occ.items() == 0 {
return false;
}
let policy = self.policy.batching();
let fixed_opt = *policy.fixed_n();
let delta_opt = *policy.max_delta_t();
match (fixed_opt, delta_opt) {
(Some(fixed_n), None) => *ingress_occ.items() >= fixed_n,
(None, Some(_max_delta_t)) => {
true
}
(Some(fixed_n), Some(max_delta_t)) => {
if *ingress_occ.items() < fixed_n {
return false;
}
let first_tick_opt = self.src.peek_ingress_creation_tick(0);
let last_tick_opt = self
.src
.peek_ingress_creation_tick(fixed_n.saturating_sub(1));
match (first_tick_opt, last_tick_opt) {
(Some(first_ticks), Some(last_ticks)) => {
let span = last_ticks.saturating_sub(first_ticks);
span <= *max_delta_t.as_u64()
}
_ => false,
}
}
(None, None) => {
true
}
}
}
}
impl<S, OutP, const OUT: usize> Node<0, OUT, (), OutP> for SourceNode<S, OutP, OUT>
where
S: Source<OutP, OUT>,
OutP: Payload + Copy,
{
#[inline]
fn describe_capabilities(&self) -> NodeCapabilities {
self.src.capabilities()
}
#[inline]
fn input_acceptance(&self) -> [PlacementAcceptance; 0] {
[]
}
#[inline]
fn output_acceptance(&self) -> [PlacementAcceptance; OUT] {
self.src.output_acceptance()
}
#[inline]
fn policy(&self) -> NodePolicy {
self.policy
}
#[cfg(any(test, feature = "bench"))]
fn set_policy(&mut self, policy: NodePolicy) {
self.policy = policy;
}
#[inline]
fn node_kind(&self) -> NodeKind {
NodeKind::Source
}
#[inline]
fn initialize<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
where
T: Telemetry,
{
self.src
.open()
.map_err(|_| NodeError::external_unavailable())
}
#[inline]
fn start<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
where
T: Telemetry,
{
Ok(())
}
#[inline]
fn process_message<C>(
&mut self,
_msg: &Message<()>,
_sys_clock: &C,
) -> Result<ProcessResult<OutP>, NodeError>
where
C: PlatformClock + Sized,
{
if let Some((_port, msg)) = self.src.try_produce() {
Ok(ProcessResult::Output(msg))
} else {
Err(NodeError::no_input())
}
}
#[inline]
fn step<'g, 't, 'ck, InQ, OutQ, InM, OutM, C, Tel>(
&mut self,
ctx: &mut StepContext<'g, 't, 'ck, 0, OUT, (), OutP, InQ, OutQ, InM, OutM, C, Tel>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<()>,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
Tel: Telemetry + Sized,
{
if let Some((port, msg)) = self.src.try_produce() {
ctx.push_output(port, msg)
} else {
Ok(StepResult::NoInput)
}
}
fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
&mut self,
ctx: &mut StepContext<
'graph,
'telemetry,
'clock,
0,
OUT,
(),
OutP,
InQ,
OutQ,
InM,
OutM,
C,
Tel,
>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<()>,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
Tel: Telemetry + Sized,
{
let ingress_occ = self.source_ref().ingress_occupancy();
if *ingress_occ.items() == 0 {
return Ok(StepResult::NoInput);
}
let policy = self.policy();
let fixed_opt = *policy.batching().fixed_n();
let delta_opt = *policy.batching().max_delta_t();
let has_batch = match (fixed_opt, delta_opt) {
(Some(fixed_n), None) => *ingress_occ.items() >= fixed_n,
(None, Some(_max_delta_t)) => {
true
}
(Some(fixed_n), Some(max_delta_t)) => {
if *ingress_occ.items() < fixed_n {
false
} else {
let first_tick_opt = self.src.peek_ingress_creation_tick(0);
let last_tick_opt = self
.src
.peek_ingress_creation_tick(fixed_n.saturating_sub(1));
match (first_tick_opt, last_tick_opt) {
(Some(first_ticks), Some(last_ticks)) => {
let span = last_ticks.saturating_sub(first_ticks);
span <= *max_delta_t.as_u64()
}
_ => false,
}
}
}
(None, None) => {
true
}
};
if !has_batch {
return Ok(StepResult::NoInput);
}
let batch_n: usize = fixed_opt.unwrap_or(1);
let mut made_progress = false;
for _ in 0..batch_n {
match self.src.try_produce() {
Some((port, msg)) => match ctx.push_output(port, msg) {
Ok(StepResult::MadeProgress) => {
made_progress = true;
}
Ok(StepResult::Backpressured) | Err(_) => {
return Ok(StepResult::Backpressured);
}
Ok(_) => {}
},
None => {
break;
}
}
}
if made_progress {
Ok(StepResult::MadeProgress)
} else {
Ok(StepResult::NoInput)
}
}
#[inline]
fn on_watchdog_timeout<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<StepResult, NodeError>
where
Tel: Telemetry,
{
Ok(StepResult::WaitingOnExternal)
}
#[inline]
fn stop<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<(), NodeError>
where
Tel: Telemetry,
{
Ok(())
}
}
pub struct SourceIngressEdge<'src, OutP, S, const OUT: usize>
where
OutP: Payload,
S: Source<OutP, OUT> + ?Sized,
{
src: &'src S,
_pd: PhantomData<OutP>,
}
impl<'src, OutP, S, const OUT: usize> SourceIngressEdge<'src, OutP, S, OUT>
where
OutP: Payload,
S: Source<OutP, OUT> + ?Sized,
{
#[inline]
pub const fn new(src: &'src S) -> Self {
Self {
src,
_pd: PhantomData,
}
}
}
impl<'src, OutP, S, const OUT: usize> Edge for SourceIngressEdge<'src, OutP, S, OUT>
where
OutP: Payload,
S: Source<OutP, OUT> + ?Sized,
{
#[inline]
fn try_push<H: HeaderStore>(
&mut self,
_token: MessageToken,
_policy: &EdgePolicy,
_headers: &H,
) -> EnqueueResult {
EnqueueResult::Rejected
}
#[inline]
fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek(&self) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
self.src.ingress_occupancy()
}
#[inline]
fn is_empty(&self) -> bool {
*self.src.ingress_occupancy().items() == 0
}
#[inline]
fn try_pop_batch<H: HeaderStore>(
&mut self,
_policy: &BatchingPolicy,
_headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError> {
Err(QueueError::Empty)
}
}
pub struct IngressEdgeLink<'src, OutP, S, const OUT: usize>
where
OutP: Payload,
S: Source<OutP, OUT> + ?Sized,
{
edge: SourceIngressEdge<'src, OutP, S, OUT>,
id: EdgeIndex,
upstream: PortId,
downstream: PortId,
policy: EdgePolicy,
name: Option<&'static str>,
}
impl<'src, OutP, S, const OUT: usize> IngressEdgeLink<'src, OutP, S, OUT>
where
OutP: Payload,
S: Source<OutP, OUT> + ?Sized,
{
#[inline]
pub const fn from_source(
src: &'src S,
id: EdgeIndex,
upstream: PortId,
downstream: PortId,
policy: EdgePolicy,
name: Option<&'static str>,
) -> Self {
Self {
edge: SourceIngressEdge::new(src),
id,
upstream,
downstream,
policy,
name,
}
}
#[inline]
pub fn descriptor(&self) -> EdgeDescriptor {
EdgeDescriptor::new(self.id, self.upstream, self.downstream, self.name)
}
#[inline]
pub fn policy(&self) -> EdgePolicy {
self.policy
}
#[inline]
pub fn inner(&self) -> &SourceIngressEdge<'src, OutP, S, OUT> {
&self.edge
}
}
impl<'s, OutP, S, const OUT: usize> Edge for IngressEdgeLink<'s, OutP, S, OUT>
where
OutP: Payload,
S: Source<OutP, OUT> + ?Sized,
{
#[inline]
fn try_push<H: HeaderStore>(
&mut self,
_token: MessageToken,
_policy: &EdgePolicy,
_headers: &H,
) -> EnqueueResult {
EnqueueResult::Rejected
}
#[inline]
fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek(&self) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
self.edge.occupancy(policy)
}
#[inline]
fn is_empty(&self) -> bool {
self.edge.is_empty()
}
#[inline]
fn try_pop_batch<H: HeaderStore>(
&mut self,
_policy: &BatchingPolicy,
_headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError> {
Err(QueueError::Empty)
}
}
pub trait IngressProbe: Send {
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy;
}
pub struct NoProbe;
impl IngressProbe for NoProbe {
#[inline]
fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
EdgeOccupancy::new(0, 0, WatermarkState::BelowSoft)
}
}
#[cfg(feature = "std")]
pub type IngressProbeImpl = probe::SourceIngressProbe;
#[cfg(not(feature = "std"))]
pub type IngressProbeImpl = NoProbe;
#[cfg(feature = "std")]
pub mod probe {
use super::*;
use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct SourceIngressProbe {
items: Arc<AtomicUsize>,
bytes: Arc<AtomicUsize>,
}
impl SourceIngressProbe {
#[inline]
pub fn new() -> Self {
Self {
items: Arc::new(AtomicUsize::new(0)),
bytes: Arc::new(AtomicUsize::new(0)),
}
}
#[inline]
pub fn set_items(&self, n: usize) {
self.items.store(n, Ordering::Relaxed);
}
#[inline]
pub fn set_bytes(&self, b: usize) {
self.bytes.store(b, Ordering::Relaxed);
}
#[inline]
pub fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
let items = self.items.load(Ordering::Relaxed);
let bytes = self.bytes.load(Ordering::Relaxed);
EdgeOccupancy::new(items, bytes, policy.watermark(items, bytes))
}
}
impl Default for SourceIngressProbe {
fn default() -> Self {
Self {
items: Arc::new(AtomicUsize::new(0)),
bytes: Arc::new(AtomicUsize::new(0)),
}
}
}
impl super::IngressProbe for SourceIngressProbe {
#[inline]
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
SourceIngressProbe::occupancy(self, policy)
}
}
#[derive(Debug, Clone)]
pub struct SourceIngressProbeEdge<P: Payload> {
probe: SourceIngressProbe,
_pd: PhantomData<P>,
}
impl<P: Payload> SourceIngressProbeEdge<P> {
#[inline]
pub fn new(probe: SourceIngressProbe) -> Self {
Self {
probe,
_pd: PhantomData,
}
}
#[inline]
pub fn inner(&self) -> &SourceIngressProbe {
&self.probe
}
}
impl<P: Payload> Edge for SourceIngressProbeEdge<P> {
#[inline]
fn try_push<H: HeaderStore>(
&mut self,
_token: MessageToken,
_policy: &EdgePolicy,
_headers: &H,
) -> EnqueueResult {
EnqueueResult::Rejected
}
#[inline]
fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek(&self) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
self.probe.occupancy(policy)
}
#[inline]
fn is_empty(&self) -> bool {
self.probe.items.load(core::sync::atomic::Ordering::Relaxed) == 0
}
#[inline]
fn try_pop_batch<H: HeaderStore>(
&mut self,
_policy: &BatchingPolicy,
_headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError> {
Err(QueueError::Empty)
}
}
#[derive(Clone)]
pub struct SourceIngressUpdater {
probe: SourceIngressProbe,
}
impl SourceIngressUpdater {
#[inline]
pub fn new(probe: SourceIngressProbe) -> Self {
Self { probe }
}
#[inline]
pub fn update(&self, items: usize, bytes: usize) {
self.probe.set_items(items);
self.probe.set_bytes(bytes);
}
}
#[inline]
pub fn new_probe_edge_pair<P: Payload>() -> (SourceIngressProbeEdge<P>, SourceIngressUpdater) {
let probe = SourceIngressProbe::new();
let edge = SourceIngressProbeEdge::<P>::new(probe.clone());
let updater = SourceIngressUpdater::new(probe);
(edge, updater)
}
#[inline]
pub fn new_probe_pair() -> (SourceIngressProbe, SourceIngressUpdater) {
let p = SourceIngressProbe::new();
(p.clone(), SourceIngressUpdater::new(p))
}
#[derive(Debug)]
pub struct ConcurrentIngressEdgeLink<OutP: Payload> {
edge: SourceIngressProbeEdge<OutP>,
id: EdgeIndex,
upstream: PortId,
downstream: PortId,
policy: EdgePolicy,
name: Option<&'static str>,
}
impl<OutP: Payload> ConcurrentIngressEdgeLink<OutP> {
#[inline]
pub fn from_probe(
probe_edge: SourceIngressProbeEdge<OutP>,
id: EdgeIndex,
upstream: PortId,
downstream: PortId,
policy: EdgePolicy,
name: Option<&'static str>,
) -> Self {
Self {
edge: probe_edge,
id,
upstream,
downstream,
policy,
name,
}
}
#[inline]
pub fn descriptor(&self) -> EdgeDescriptor {
EdgeDescriptor::new(self.id, self.upstream, self.downstream, self.name)
}
#[inline]
pub fn policy(&self) -> EdgePolicy {
self.policy
}
#[inline]
pub fn inner(&self) -> &SourceIngressProbeEdge<OutP> {
&self.edge
}
#[inline]
pub fn inner_mut(&mut self) -> &mut SourceIngressProbeEdge<OutP> {
&mut self.edge
}
}
impl<OutP: Payload> Edge for ConcurrentIngressEdgeLink<OutP> {
#[inline]
fn try_push<H: HeaderStore>(
&mut self,
_token: MessageToken,
_policy: &EdgePolicy,
_headers: &H,
) -> EnqueueResult {
EnqueueResult::Rejected
}
#[inline]
fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek(&self) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
self.edge.occupancy(policy)
}
#[inline]
fn is_empty(&self) -> bool {
self.edge.is_empty()
}
#[inline]
fn try_pop_batch<H: HeaderStore>(
&mut self,
_policy: &BatchingPolicy,
_headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError> {
Err(QueueError::Empty)
}
}
}