flarrow_api/io/
queryables.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::tokio::sync::Mutex;
5
6type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
7type Senders = SharedMap<Uuid, HashMap<Uuid, MessageSender>>;
8type Receivers = SharedMap<Uuid, MessageReceiver>;
9
10/// Queryables let you manage queryable connections during a node *implementation*
11pub struct Queryables {
12    senders: Senders,
13    receivers: Receivers,
14
15    clock: Arc<uhlc::HLC>,
16
17    source: NodeLayout,
18}
19
20impl Queryables {
21    /// Creates a new instance of 'Queryables'
22    pub fn new(
23        senders: Senders,
24        receivers: Receivers,
25        clock: Arc<uhlc::HLC>,
26        source: NodeLayout,
27    ) -> Self {
28        Self {
29            clock,
30            senders,
31            receivers,
32            source,
33        }
34    }
35
36    async fn compute(
37        &mut self,
38        queryable: impl Into<String>,
39    ) -> Result<(
40        HashMap<Uuid, MessageSender>,
41        MessageReceiver,
42        QueryableLayout,
43    )> {
44        let label: String = queryable.into();
45        let layout = self.source.queryable(&label);
46
47        let senders = self
48            .senders
49            .lock()
50            .await
51            .remove(&layout.uuid)
52            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
53
54        let receiver = self
55            .receivers
56            .lock()
57            .await
58            .remove(&layout.uuid)
59            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
60
61        Ok((senders, receiver, layout))
62    }
63
64    /// Creates a new raw Queryable, this raw queryable has no type information so you have
65    /// to manually transform it
66    pub async fn raw(&mut self, queryable: impl Into<String>) -> Result<RawQueryable> {
67        let (senders, receivers, layout) = self.compute(queryable).await?;
68
69        tracing::debug!(
70            "Creating new raw queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
71            layout.label,
72            layout.uuid,
73            self.source.label,
74            self.source.uuid
75        );
76
77        Ok(RawQueryable::new(
78            senders,
79            receivers,
80            self.clock.clone(),
81            self.source.clone(),
82            layout,
83        ))
84    }
85
86    /// Creates a new typed Queryable, this queryable has type information
87    pub async fn with<T: ArrowMessage, F: ArrowMessage>(
88        &mut self,
89        queryable: impl Into<String>,
90    ) -> Result<Queryable<T, F>> {
91        let (senders, receivers, layout) = self.compute(queryable).await?;
92
93        tracing::debug!(
94            "Creating new queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
95            layout.label,
96            layout.uuid,
97            self.source.label,
98            self.source.uuid
99        );
100
101        Ok(Queryable::new(
102            senders,
103            receivers,
104            self.clock.clone(),
105            self.source.clone(),
106            layout,
107        ))
108    }
109}