use std::sync::Arc;
use crate::prelude::*;
use thirdparty::arrow_data::ArrayData;
pub struct RawQuery {
pub tx: MessageSender,
pub rx: MessageReceiver,
pub clock: Arc<HLC>,
pub source: NodeLayout,
pub layout: QueryLayout,
}
impl RawQuery {
pub fn new(
tx: MessageSender,
rx: MessageReceiver,
clock: Arc<HLC>,
source: NodeLayout,
layout: QueryLayout,
) -> Self {
Self {
tx,
rx,
clock,
source,
layout,
}
}
pub fn blocking_query(&mut self, data: ArrayData) -> Result<(Header, ArrayData)> {
let data = DataflowMessage {
header: Header {
timestamp: self.clock.new_timestamp(),
source: (self.source.uuid, self.layout.uuid),
},
data,
};
self.tx
.blocking_send(data)
.wrap_err(report_error_sending(&self.source, &self.layout))?;
let DataflowMessage { header, data } = self
.rx
.blocking_recv()
.ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
Ok((header, data))
}
pub async fn query(&mut self, data: ArrayData) -> Result<(Header, ArrayData)> {
let data = DataflowMessage {
header: Header {
timestamp: self.clock.new_timestamp(),
source: (self.source.uuid, self.layout.uuid),
},
data,
};
self.tx
.send(data)
.await
.wrap_err(report_error_sending(&self.source, &self.layout))?;
let DataflowMessage { header, data } = self
.rx
.recv()
.await
.ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
Ok((header, data))
}
}