use sealed::sealed;
use crate::compile::builder::CycleId;
use crate::location::Location;
use crate::location::dynamic::LocationId;
use crate::staging_util::Invariant;
#[sealed]
pub(crate) trait ReceiverKind {}
pub enum ForwardRef {}
#[sealed]
impl ReceiverKind for ForwardRef {}
pub enum TickCycle {}
#[sealed]
impl ReceiverKind for TickCycle {}
pub(crate) trait ReceiverComplete<'a, Marker>
where
Marker: ReceiverKind,
{
fn complete(self, cycle_id: CycleId, expected_location: LocationId);
}
pub(crate) trait CycleCollection<'a, Kind>: ReceiverComplete<'a, Kind>
where
Kind: ReceiverKind,
{
type Location: Location<'a>;
fn create_source(id: CycleId, location: Self::Location) -> Self;
}
pub(crate) trait CycleCollectionWithInitial<'a, Kind>: ReceiverComplete<'a, Kind>
where
Kind: ReceiverKind,
{
type Location: Location<'a>;
fn create_source_with_initial(
cycle_id: CycleId,
initial: Self,
location: Self::Location,
) -> Self;
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
pub struct ForwardHandle<'a, C: ReceiverComplete<'a, ForwardRef>> {
completed: bool,
cycle_id: CycleId,
expected_location: LocationId,
_phantom: Invariant<'a, C>,
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
Self {
completed: false,
cycle_id,
expected_location,
_phantom: std::marker::PhantomData,
}
}
}
impl<'a, C: ReceiverComplete<'a, ForwardRef>> Drop for ForwardHandle<'a, C> {
fn drop(&mut self) {
if !self.completed && !std::thread::panicking() {
panic!("ForwardHandle dropped without being completed");
}
}
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
pub fn complete(mut self, stream: impl Into<C>) {
self.completed = true;
C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
}
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
pub struct TickCycleHandle<'a, C: ReceiverComplete<'a, TickCycle>> {
completed: bool,
cycle_id: CycleId,
expected_location: LocationId,
_phantom: Invariant<'a, C>,
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
Self {
completed: false,
cycle_id,
expected_location,
_phantom: std::marker::PhantomData,
}
}
}
impl<'a, C: ReceiverComplete<'a, TickCycle>> Drop for TickCycleHandle<'a, C> {
fn drop(&mut self) {
if !self.completed && !std::thread::panicking() {
panic!("TickCycleHandle dropped without being completed");
}
}
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
pub fn complete_next_tick(mut self, stream: impl Into<C>) {
self.completed = true;
C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
}
}
#[doc(hidden)]
pub trait CompleteCycle<S> {
fn complete_next_tick(self, state: S);
}
impl<'a, C: ReceiverComplete<'a, TickCycle>> CompleteCycle<C> for TickCycleHandle<'a, C> {
fn complete_next_tick(self, state: C) {
TickCycleHandle::complete_next_tick(self, state)
}
}