iridis_node/primitives/
queries.rs1use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::tokio::sync::Mutex, *};
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Senders = SharedMap<Uuid, MessageSender>;
9type Receivers = SharedMap<Uuid, MessageReceiver>;
10
11pub struct Queries {
13 senders: Senders,
14 receivers: Receivers,
15
16 clock: Arc<uhlc::HLC>,
17
18 source: NodeID,
19}
20
21impl Queries {
22 pub fn new(
24 senders: Senders,
25 receivers: Receivers,
26 clock: Arc<uhlc::HLC>,
27 source: NodeID,
28 ) -> Self {
29 Self {
30 senders,
31 receivers,
32 clock,
33 source,
34 }
35 }
36
37 async fn compute(
38 &mut self,
39 query: impl Into<String>,
40 ) -> Result<(MessageSender, MessageReceiver, QueryID)> {
41 let label: String = query.into();
42 let layout = self.source.query(&label);
43
44 let sender = self
45 .senders
46 .lock()
47 .await
48 .remove(&layout.uuid)
49 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
50
51 let receiver = self
52 .receivers
53 .lock()
54 .await
55 .remove(&layout.uuid)
56 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
57
58 Ok((sender, receiver, layout))
59 }
60
61 pub async fn raw(&mut self, query: impl Into<String>) -> Result<RawQuery> {
64 let (tx, rx, layout) = self.compute(query).await?;
65
66 tracing::debug!(
67 "Creating new raw query '{}' (uuid: {}) for node '{}' (uuid: {})",
68 layout.label,
69 layout.uuid,
70 self.source.label,
71 self.source.uuid
72 );
73
74 Ok(RawQuery::new(
75 tx,
76 rx,
77 self.clock.clone(),
78 self.source.clone(),
79 layout,
80 ))
81 }
82
83 pub async fn with<T: ArrowMessage, F: ArrowMessage>(
85 &mut self,
86 query: impl Into<String>,
87 ) -> Result<Query<T, F>> {
88 let (tx, rx, layout) = self.compute(query).await?;
89
90 tracing::debug!(
91 "Creating new query '{}' (uuid: {}) for node '{}' (uuid: {})",
92 layout.label,
93 layout.uuid,
94 self.source.label,
95 self.source.uuid
96 );
97
98 Ok(Query::new(
99 tx,
100 rx,
101 self.clock.clone(),
102 self.source.clone(),
103 layout,
104 ))
105 }
106}