use std::{
collections::VecDeque,
sync::{Arc, Mutex},
task::Waker,
};
use crate::ObjectStateStream;
pub(crate) struct ObserveContractShared {
object_state_stream_ready_for_check_out: VecDeque<ObjectStateStream>,
waker: Option<Waker>,
}
impl ObserveContractShared {
pub fn new() -> ObserveContractShared {
ObserveContractShared { object_state_stream_ready_for_check_out: VecDeque::new(), waker: None }
}
fn wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
pub fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<ObjectStateStream>> {
if let Some(object_state_stream) = self.object_state_stream_ready_for_check_out.pop_front() {
std::task::Poll::Ready(Some(object_state_stream))
} else {
self.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
}
#[derive(Clone)]
pub(crate) struct ObserveContractInlet {
shared: Arc<Mutex<ObserveContractShared>>,
}
impl ObserveContractInlet {
pub fn new(shared: Arc<Mutex<ObserveContractShared>>) -> ObserveContractInlet {
ObserveContractInlet { shared }
}
pub fn stream_created(&mut self, object_state_stream: ObjectStateStream) {
let mut shared = self.shared.lock().unwrap();
shared.object_state_stream_ready_for_check_out.push_back(object_state_stream);
shared.wake();
}
}