iridis_node/primitives/
queryables.rs

1//! This module contains implementations for this primitive.
2
3use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::tokio::sync::Mutex, *};
6
7type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
8type Senders = SharedMap<Uuid, HashMap<Uuid, MessageSender>>;
9type Receivers = SharedMap<Uuid, MessageReceiver>;
10
11/// Queryables let you manage queryable connections during a node *implementation*
12pub struct Queryables {
13    senders: Senders,
14    receivers: Receivers,
15
16    clock: Arc<uhlc::HLC>,
17
18    source: NodeID,
19}
20
21impl Queryables {
22    /// Creates a new instance of 'Queryables'
23    pub fn new(
24        senders: Senders,
25        receivers: Receivers,
26        clock: Arc<uhlc::HLC>,
27        source: NodeID,
28    ) -> Self {
29        Self {
30            clock,
31            senders,
32            receivers,
33            source,
34        }
35    }
36
37    async fn compute(
38        &mut self,
39        queryable: impl Into<String>,
40    ) -> Result<(HashMap<Uuid, MessageSender>, MessageReceiver, QueryableID)> {
41        let label: String = queryable.into();
42        let layout = self.source.queryable(&label);
43
44        let senders = self
45            .senders
46            .lock()
47            .await
48            .remove(&layout.uuid)
49            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
50
51        let receiver = self
52            .receivers
53            .lock()
54            .await
55            .remove(&layout.uuid)
56            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
57
58        Ok((senders, receiver, layout))
59    }
60
61    /// Creates a new raw Queryable, this raw queryable has no type information so you have
62    /// to manually transform it
63    pub async fn raw(&mut self, queryable: impl Into<String>) -> Result<RawQueryable> {
64        let (senders, receivers, layout) = self.compute(queryable).await?;
65
66        tracing::debug!(
67            "Creating new raw queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
68            layout.label,
69            layout.uuid,
70            self.source.label,
71            self.source.uuid
72        );
73
74        Ok(RawQueryable::new(
75            senders,
76            receivers,
77            self.clock.clone(),
78            self.source.clone(),
79            layout,
80        ))
81    }
82
83    /// Creates a new typed Queryable, this queryable has type information
84    pub async fn with<T: ArrowMessage, F: ArrowMessage>(
85        &mut self,
86        queryable: impl Into<String>,
87    ) -> Result<Queryable<T, F>> {
88        let (senders, receivers, layout) = self.compute(queryable).await?;
89
90        tracing::debug!(
91            "Creating new queryable '{}' (uuid: {}) for node '{}' (uuid: {})",
92            layout.label,
93            layout.uuid,
94            self.source.label,
95            self.source.uuid
96        );
97
98        Ok(Queryable::new(
99            senders,
100            receivers,
101            self.clock.clone(),
102            self.source.clone(),
103            layout,
104        ))
105    }
106}