1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6pub struct Query<T: ArrowMessage, F: ArrowMessage> {
8 pub raw: RawQuery,
9
10 _phantom: std::marker::PhantomData<(T, F)>,
11}
12
13impl<T: ArrowMessage, F: ArrowMessage> Query<T, F> {
14 pub fn new(
16 tx: MessageSender,
17 rx: MessageReceiver,
18 clock: Arc<HLC>,
19 source: NodeLayout,
20 layout: QueryLayout,
21 ) -> Self {
22 Self {
23 raw: RawQuery::new(tx, rx, clock, source, layout),
24 _phantom: std::marker::PhantomData,
25 }
26 }
27
28 pub fn blocking_query(&mut self, data: T) -> Result<(Header, F)> {
31 let (header, data) = self.raw.blocking_query(
32 data.try_into_arrow()
33 .wrap_err(report_failed_conversion_to_arrow::<T>(
34 &self.raw.source,
35 &self.raw.layout,
36 ))?
37 .into_data(),
38 )?;
39
40 Ok((
41 header,
42 F::try_from_arrow(data).wrap_err(report_failed_conversion_from_arrow::<F>(
43 &self.raw.source,
44 &self.raw.layout,
45 ))?,
46 ))
47 }
48
49 pub async fn query(&mut self, data: T) -> Result<(Header, F)> {
51 let (header, data) = self
52 .raw
53 .query(
54 data.try_into_arrow()
55 .wrap_err(report_failed_conversion_to_arrow::<T>(
56 &self.raw.source,
57 &self.raw.layout,
58 ))?
59 .into_data(),
60 )
61 .await?;
62
63 Ok((
64 header,
65 F::try_from_arrow(data).wrap_err(report_failed_conversion_from_arrow::<F>(
66 &self.raw.source,
67 &self.raw.layout,
68 ))?,
69 ))
70 }
71}