use stageleft::{QuotedWithContext, q};
#[cfg(stageleft_runtime)]
use super::dynamic::DynLocation;
use super::{Location, LocationId};
use crate::compile::builder::{ClockId, FlowState};
use crate::compile::ir::{HydroNode, HydroSource};
#[cfg(stageleft_runtime)]
use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
use crate::forward_handle::{TickCycle, TickCycleHandle};
use crate::live_collections::Singleton;
use crate::live_collections::boundedness::Bounded;
use crate::live_collections::optional::Optional;
use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
use crate::location::TopLevel;
use crate::nondet::{NonDet, nondet};
#[derive(Clone)]
pub struct Atomic<Loc> {
pub(crate) tick: Tick<Loc>,
}
impl<L: DynLocation> DynLocation for Atomic<L> {
fn dyn_id(&self) -> LocationId {
LocationId::Atomic(Box::new(self.tick.dyn_id()))
}
fn flow_state(&self) -> &FlowState {
self.tick.flow_state()
}
fn is_top_level() -> bool {
L::is_top_level()
}
fn multiversioned(&self) -> bool {
self.tick.multiversioned()
}
fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
L::cluster_consistency()
}
}
impl<'a, L> Location<'a> for Atomic<L>
where
L: Location<'a>,
{
type Root = L::Root;
type DropConsistency = Atomic<L::DropConsistency>;
fn consistency() -> Option<super::dynamic::ClusterConsistency> {
L::consistency()
}
fn root(&self) -> Self::Root {
self.tick.root()
}
fn drop_consistency(&self) -> Self::DropConsistency {
Atomic {
tick: self.tick.drop_consistency(),
}
}
fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
Atomic {
tick: Tick::from_drop_consistency(l2.tick),
}
}
}
pub trait DeferTick {
fn defer_tick(self) -> Self;
}
#[derive(Clone)]
pub struct Tick<L> {
pub(crate) id: ClockId,
pub(crate) l: L,
}
impl<L: DynLocation> DynLocation for Tick<L> {
fn dyn_id(&self) -> LocationId {
LocationId::Tick(self.id, Box::new(self.l.dyn_id()))
}
fn flow_state(&self) -> &FlowState {
self.l.flow_state()
}
fn is_top_level() -> bool {
false
}
fn multiversioned(&self) -> bool {
self.l.multiversioned()
}
fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
L::cluster_consistency()
}
}
impl<'a, L> Location<'a> for Tick<L>
where
L: Location<'a>,
{
type Root = L::Root;
type DropConsistency = Tick<L::DropConsistency>;
fn consistency() -> Option<super::dynamic::ClusterConsistency> {
L::consistency()
}
fn root(&self) -> Self::Root {
self.l.root()
}
fn drop_consistency(&self) -> Self::DropConsistency {
Tick {
id: self.id,
l: self.l.drop_consistency(),
}
}
fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
Tick {
id: l2.id,
l: L::from_drop_consistency(l2.l),
}
}
}
impl<'a, L> Tick<L>
where
L: Location<'a>,
{
pub fn outer(&self) -> &L {
&self.l
}
pub fn spin_batch(
&self,
batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
where
L: TopLevel<'a>,
{
let out = self
.l
.spin()
.flat_map_ordered(q!(move |_| 0..batch_size))
.map(q!(|_| ()));
let inner = out.batch(self, nondet!());
Stream::new(self.clone(), inner.ir_node.replace(HydroNode::Placeholder))
}
pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
let e = q!([]);
let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
let unit_optional: Optional<(), Self, Bounded> = Optional::new(
self.clone(),
HydroNode::Source {
source: HydroSource::Iter(e.into()),
metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
},
);
unit_optional.map(q!(|_| unreachable!())) }
pub fn optional_first_tick<T: Clone>(
&self,
e: impl QuotedWithContext<'a, T, Tick<L>>,
) -> Optional<T, Self, Bounded> {
let e = e.splice_untyped_ctx(self);
Optional::new(
self.clone(),
HydroNode::SingletonSource {
value: e.into(),
first_tick_only: true,
metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
},
)
}
pub fn current_tick_instant(
&self,
_nondet: NonDet,
) -> Singleton<tokio::time::Instant, Tick<L::DropConsistency>, Bounded>
where
Self: Sized,
{
self.singleton(q!(tokio::time::Instant::now()))
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
pub fn cycle<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
&self,
) -> (TickCycleHandle<'a, S>, S)
where
S: CycleCollection<'a, TickCycle, Location = L2> + DeferTick,
{
let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
(
TickCycleHandle::new(cycle_id, Location::id(self)),
S::create_source(cycle_id, self.clone().with_consistency_of()).defer_tick(),
)
}
#[expect(
private_bounds,
reason = "only Hydro collections can implement ReceiverComplete"
)]
pub fn cycle_with_initial<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
&self,
initial: S,
) -> (TickCycleHandle<'a, S>, S)
where
S: CycleCollectionWithInitial<'a, TickCycle, Location = L2>,
{
let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
(
TickCycleHandle::new(cycle_id, Location::id(self)),
S::create_source_with_initial(cycle_id, initial, self.clone().with_consistency_of()),
)
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "sim")]
use stageleft::q;
#[cfg(feature = "sim")]
use crate::live_collections::sliced::sliced;
#[cfg(feature = "sim")]
use crate::location::Location;
#[cfg(feature = "sim")]
use crate::nondet::nondet;
#[cfg(feature = "sim")]
use crate::prelude::FlowBuilder;
#[cfg(feature = "sim")]
#[test]
fn sim_atomic_stream() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (write_send, write_req) = node.sim_input();
let (read_send, read_req) = node.sim_input::<(), _, _>();
let atomic_write = write_req.atomic();
let current_state = atomic_write.clone().fold(
q!(|| 0),
q!(|state: &mut i32, v: i32| {
*state += v;
}),
);
let write_ack_recv = atomic_write.end_atomic().sim_output();
let read_response_recv = sliced! {
let batch_of_req = use(read_req, nondet!());
let latest_singleton = use::atomic(current_state, nondet!());
batch_of_req.cross_singleton(latest_singleton)
}
.sim_output();
let sim_compiled = flow.sim().compiled();
let instances = sim_compiled.exhaustive(async || {
write_send.send(1);
write_ack_recv.assert_yields([1]).await;
read_send.send(());
assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
});
assert_eq!(instances, 1);
let instances_read_before_write = sim_compiled.exhaustive(async || {
write_send.send(1);
read_send.send(());
write_ack_recv.assert_yields([1]).await;
let _ = read_response_recv.next().await;
});
assert_eq!(instances_read_before_write, 3); }
#[cfg(feature = "sim")]
#[test]
#[should_panic]
fn sim_non_atomic_stream() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (write_send, write_req) = node.sim_input();
let (read_send, read_req) = node.sim_input::<(), _, _>();
let current_state = write_req.clone().fold(
q!(|| 0),
q!(|state: &mut i32, v: i32| {
*state += v;
}),
);
let write_ack_recv = write_req.sim_output();
let read_response_recv = sliced! {
let batch_of_req = use(read_req, nondet!());
let latest_singleton = use(current_state, nondet!());
batch_of_req.cross_singleton(latest_singleton)
}
.sim_output();
flow.sim().exhaustive(async || {
write_send.send(1);
write_ack_recv.assert_yields([1]).await;
read_send.send(());
if let Some((_, v)) = read_response_recv.next().await {
assert_eq!(v, 1);
}
});
}
}