flarrow_api/io/
queries.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::tokio::sync::Mutex;
5
6type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
7type Senders = SharedMap<Uuid, MessageSender>;
8type Receivers = SharedMap<Uuid, MessageReceiver>;
9
10pub struct Queries {
12 senders: Senders,
13 receivers: Receivers,
14
15 clock: Arc<uhlc::HLC>,
16
17 source: NodeLayout,
18}
19
20impl Queries {
21 pub fn new(
23 senders: Senders,
24 receivers: Receivers,
25 clock: Arc<uhlc::HLC>,
26 source: NodeLayout,
27 ) -> Self {
28 Self {
29 senders,
30 receivers,
31 clock,
32 source,
33 }
34 }
35
36 async fn compute(
37 &mut self,
38 query: impl Into<String>,
39 ) -> Result<(MessageSender, MessageReceiver, QueryLayout)> {
40 let label: String = query.into();
41 let layout = self.source.query(&label);
42
43 let sender = self
44 .senders
45 .lock()
46 .await
47 .remove(&layout.uuid)
48 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
49
50 let receiver = self
51 .receivers
52 .lock()
53 .await
54 .remove(&layout.uuid)
55 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
56
57 Ok((sender, receiver, layout))
58 }
59
60 pub async fn raw(&mut self, query: impl Into<String>) -> Result<RawQuery> {
63 let (tx, rx, layout) = self.compute(query).await?;
64
65 tracing::debug!(
66 "Creating new raw query '{}' (uuid: {}) for node '{}' (uuid: {})",
67 layout.label,
68 layout.uuid,
69 self.source.label,
70 self.source.uuid
71 );
72
73 Ok(RawQuery::new(
74 tx,
75 rx,
76 self.clock.clone(),
77 self.source.clone(),
78 layout,
79 ))
80 }
81
82 pub async fn with<T: ArrowMessage, F: ArrowMessage>(
84 &mut self,
85 query: impl Into<String>,
86 ) -> Result<Query<T, F>> {
87 let (tx, rx, layout) = self.compute(query).await?;
88
89 tracing::debug!(
90 "Creating new query '{}' (uuid: {}) for node '{}' (uuid: {})",
91 layout.label,
92 layout.uuid,
93 self.source.label,
94 self.source.uuid
95 );
96
97 Ok(Query::new(
98 tx,
99 rx,
100 self.clock.clone(),
101 self.source.clone(),
102 layout,
103 ))
104 }
105}