use crate::async_primitives::morsel_linearizer::MorselLinearizer;
use crate::morsel::Morsel;
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputRecv;
#[derive(Copy, Clone)]
pub enum BridgeState {
NotYetStarted,
Running,
Stopped(StopReason),
}
#[derive(Copy, Clone)]
pub enum StopReason {
ReadersDisconnected,
ComputeNodeDisconnected,
}
pub enum BridgeRecvPort {
Direct {
rx: FileReaderOutputRecv,
first_morsel: Option<Morsel>,
},
Linearized { rx: MorselLinearizer },
}
impl BridgeRecvPort {
pub async fn recv(&mut self) -> Result<Morsel, ()> {
use BridgeRecvPort::*;
match self {
Direct { rx, first_morsel } => {
if let Some(v) = first_morsel.take() {
Ok(v)
} else {
rx.recv().await
}
},
Linearized { rx } => rx.get().await.ok_or(()),
}
}
}