use std::ops::Add;
use std::time::{Duration, Instant};
use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, TryRecvError};
use crate::promise::streaming_promise::{StreamingPromise, StreamingPromiseState, UpdateResult};
pub struct WrappedMspcReceiver<A> {
receiver: Receiver<A>,
cached: Vec<A>,
status: StreamingPromiseState,
}
impl<A> WrappedMspcReceiver<A> {
pub fn new(receiver: Receiver<A>) -> Self {
Self {
receiver,
cached: vec![],
status: StreamingPromiseState::Streaming,
}
}
}
impl<A> StreamingPromise<A> for WrappedMspcReceiver<A> {
fn state(&self) -> StreamingPromiseState {
self.status
}
fn drain(&mut self, how_long: Option<Duration>) -> StreamingPromiseState {
if self.status != StreamingPromiseState::Streaming {
return self.status;
}
if let Some(duration) = how_long {
let deadline = Instant::now().add(duration);
loop {
match self.receiver.recv_deadline(deadline) {
Ok(item) => {
self.cached.push(item);
}
Err(error) => match error {
RecvTimeoutError::Timeout => {
return StreamingPromiseState::Streaming;
}
RecvTimeoutError::Disconnected => {
self.status = StreamingPromiseState::Finished;
return self.status;
}
},
}
}
} else {
loop {
match self.receiver.recv() {
Ok(item) => {
self.cached.push(item);
}
Err(error) => match error {
RecvError => {
self.status = StreamingPromiseState::Finished;
return self.status;
}
},
}
}
}
}
fn update(&mut self) -> UpdateResult {
if self.status != StreamingPromiseState::Streaming {
return UpdateResult {
state: self.status,
has_changed: false,
};
}
let old_state = self.status;
let mut changed = false;
loop {
match self.receiver.try_recv() {
Ok(item) => {
self.cached.push(item);
changed = true;
}
Err(error) => match error {
TryRecvError::Empty => {
break;
}
TryRecvError::Disconnected => {
self.status = StreamingPromiseState::Finished;
break;
}
},
}
}
UpdateResult {
state: self.status,
has_changed: changed || (old_state != self.status),
}
}
fn read(&self) -> &Vec<A> {
&self.cached
}
}