iridis_node/primitives/
queryables.rs1use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::tokio::sync::Mutex, *};
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Senders = SharedMap<Uuid, HashMap<Uuid, MessageSender>>;
9type Receivers = SharedMap<Uuid, MessageReceiver>;
10
11pub struct Queryables {
13 senders: Senders,
14 receivers: Receivers,
15
16 clock: Arc<uhlc::HLC>,
17
18 source: NodeID,
19}
20
21impl Queryables {
22 pub fn new(
24 senders: Senders,
25 receivers: Receivers,
26 clock: Arc<uhlc::HLC>,
27 source: NodeID,
28 ) -> Self {
29 Self {
30 clock,
31 senders,
32 receivers,
33 source,
34 }
35 }
36
37 async fn compute(
38 &mut self,
39 queryable: impl Into<String>,
40 ) -> Result<(HashMap<Uuid, MessageSender>, MessageReceiver, QueryableID)> {
41 let label: String = queryable.into();
42 let layout = self.source.queryable(&label);
43
44 let senders = self
45 .senders
46 .lock()
47 .await
48 .remove(&layout.uuid)
49 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
50
51 let receiver = self
52 .receivers
53 .lock()
54 .await
55 .remove(&layout.uuid)
56 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
57
58 Ok((senders, receiver, layout))
59 }
60
61 pub async fn raw(&mut self, queryable: impl Into<String>) -> Result<RawQueryable> {
64 let (senders, receivers, layout) = self.compute(queryable).await?;
65
66 tracing::debug!(
67 "Creating new raw queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
68 layout.label,
69 layout.uuid,
70 self.source.label,
71 self.source.uuid
72 );
73
74 Ok(RawQueryable::new(
75 senders,
76 receivers,
77 self.clock.clone(),
78 self.source.clone(),
79 layout,
80 ))
81 }
82
83 pub async fn with<T: ArrowMessage, F: ArrowMessage>(
85 &mut self,
86 queryable: impl Into<String>,
87 ) -> Result<Queryable<T, F>> {
88 let (senders, receivers, layout) = self.compute(queryable).await?;
89
90 tracing::debug!(
91 "Creating new queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
92 layout.label,
93 layout.uuid,
94 self.source.label,
95 self.source.uuid
96 );
97
98 Ok(Queryable::new(
99 senders,
100 receivers,
101 self.clock.clone(),
102 self.source.clone(),
103 layout,
104 ))
105 }
106}