flarrow_api/io/
queryables.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::tokio::sync::Mutex;
5
6type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
7type Senders = SharedMap<Uuid, HashMap<Uuid, MessageSender>>;
8type Receivers = SharedMap<Uuid, MessageReceiver>;
9
10pub struct Queryables {
12 senders: Senders,
13 receivers: Receivers,
14
15 clock: Arc<uhlc::HLC>,
16
17 source: NodeLayout,
18}
19
20impl Queryables {
21 pub fn new(
23 senders: Senders,
24 receivers: Receivers,
25 clock: Arc<uhlc::HLC>,
26 source: NodeLayout,
27 ) -> Self {
28 Self {
29 clock,
30 senders,
31 receivers,
32 source,
33 }
34 }
35
36 async fn compute(
37 &mut self,
38 queryable: impl Into<String>,
39 ) -> Result<(
40 HashMap<Uuid, MessageSender>,
41 MessageReceiver,
42 QueryableLayout,
43 )> {
44 let label: String = queryable.into();
45 let layout = self.source.queryable(&label);
46
47 let senders = self
48 .senders
49 .lock()
50 .await
51 .remove(&layout.uuid)
52 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
53
54 let receiver = self
55 .receivers
56 .lock()
57 .await
58 .remove(&layout.uuid)
59 .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
60
61 Ok((senders, receiver, layout))
62 }
63
64 pub async fn raw(&mut self, queryable: impl Into<String>) -> Result<RawQueryable> {
67 let (senders, receivers, layout) = self.compute(queryable).await?;
68
69 tracing::debug!(
70 "Creating new raw queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
71 layout.label,
72 layout.uuid,
73 self.source.label,
74 self.source.uuid
75 );
76
77 Ok(RawQueryable::new(
78 senders,
79 receivers,
80 self.clock.clone(),
81 self.source.clone(),
82 layout,
83 ))
84 }
85
86 pub async fn with<T: ArrowMessage, F: ArrowMessage>(
88 &mut self,
89 queryable: impl Into<String>,
90 ) -> Result<Queryable<T, F>> {
91 let (senders, receivers, layout) = self.compute(queryable).await?;
92
93 tracing::debug!(
94 "Creating new queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
95 layout.label,
96 layout.uuid,
97 self.source.label,
98 self.source.uuid
99 );
100
101 Ok(Queryable::new(
102 senders,
103 receivers,
104 self.clock.clone(),
105 self.source.clone(),
106 layout,
107 ))
108 }
109}