use std::{collections::HashMap, sync::Arc};
use crate::prelude::{thirdparty::arrow_array::Array, *};
pub struct Queryable<T: ArrowMessage, F: ArrowMessage> {
pub raw: RawQueryable,
_phantom: std::marker::PhantomData<(T, F)>,
}
impl<T: ArrowMessage, F: ArrowMessage> Queryable<T, F> {
pub fn new(
tx: HashMap<Uuid, MessageSender>,
rx: MessageReceiver,
clock: Arc<HLC>,
source: NodeID,
layout: QueryableID,
) -> Self {
Self {
raw: RawQueryable::new(tx, rx, clock, source, layout),
_phantom: std::marker::PhantomData,
}
}
pub async fn on_query(
&mut self,
response: impl AsyncFnOnce(TypedDataflowMessage<T>) -> Result<F>,
) -> Result<()> {
let source = self.raw.source.clone();
let layout = self.raw.layout.clone();
self.raw
.on_query(async move |message| {
let result = response(
message
.try_into()
.wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
)
.await?;
Ok(result
.try_into_arrow()
.wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
.into_data())
})
.await
}
}