use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::rhythm::Rhythm;
#[derive(Debug, Clone)]
pub struct CountHandle {
shared_target: Arc<AtomicUsize>,
}
impl CountHandle {
pub fn set_times(&self, times: usize) {
self.shared_target.store(times, Ordering::Relaxed);
}
pub fn get_times(&self) -> usize {
self.shared_target.load(Ordering::Relaxed)
}
}
pub struct CountRhythm {
shared_target: Arc<AtomicUsize>,
}
impl CountRhythm {
pub fn new(initial_times: usize) -> Self {
Self { shared_target: Arc::new(AtomicUsize::new(initial_times)) }
}
pub fn from_launch_params(params: &serde_yaml::Value) -> Self {
let count = params.get("count").and_then(|v| v.as_u64()).unwrap_or(1) as usize;
Self::new(count)
}
}
pub fn create_count_rhythm(initial_times: usize) -> (CountHandle, CountRhythm) {
let rhythm = CountRhythm::new(initial_times);
let handle = CountHandle { shared_target: rhythm.shared_target.clone() };
(handle, rhythm)
}
impl Rhythm for CountRhythm {
type Yield = usize;
type Feed = ();
type Output = ();
type Error = crate::RoplatError;
async fn drive<N, F, Fut>(
&mut self,
mut nodes: N,
mut op_domain: F,
) -> (Self::Output, N)
where
N: Send,
F: FnMut(N, Self::Yield) -> Fut + Send,
Fut: Future<Output = (Self::Feed, N)> + Send,
{
let mut current_seq = 0;
loop {
let target = self.shared_target.load(Ordering::Relaxed);
if current_seq >= target {
break;
}
let ((), returned_nodes) = op_domain(nodes, current_seq).await;
nodes = returned_nodes;
current_seq += 1;
tokio::task::yield_now().await;
}
((), nodes)
}
}