flarrow_api/io/
queries.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::tokio::sync::Mutex;
5
6type SharedMap<K, V> = Arc<Mutex<HashMap<K, V>>>;
7type Senders = SharedMap<Uuid, MessageSender>;
8type Receivers = SharedMap<Uuid, MessageReceiver>;
9
10/// Queries let you manage query connections during a node *implementation*
11pub struct Queries {
12    senders: Senders,
13    receivers: Receivers,
14
15    clock: Arc<uhlc::HLC>,
16
17    source: NodeLayout,
18}
19
20impl Queries {
21    /// Creates a new instance of `Queries`.
22    pub fn new(
23        senders: Senders,
24        receivers: Receivers,
25        clock: Arc<uhlc::HLC>,
26        source: NodeLayout,
27    ) -> Self {
28        Self {
29            senders,
30            receivers,
31            clock,
32            source,
33        }
34    }
35
36    async fn compute(
37        &mut self,
38        query: impl Into<String>,
39    ) -> Result<(MessageSender, MessageReceiver, QueryLayout)> {
40        let label: String = query.into();
41        let layout = self.source.query(&label);
42
43        let sender = self
44            .senders
45            .lock()
46            .await
47            .remove(&layout.uuid)
48            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
49
50        let receiver = self
51            .receivers
52            .lock()
53            .await
54            .remove(&layout.uuid)
55            .ok_or_eyre(report_io_not_found(&self.source, &layout))?;
56
57        Ok((sender, receiver, layout))
58    }
59
60    /// Creates a new raw Query, this raw query has no type information so you have
61    /// to manually transform it
62    pub async fn raw(&mut self, query: impl Into<String>) -> Result<RawQuery> {
63        let (tx, rx, layout) = self.compute(query).await?;
64
65        tracing::debug!(
66            "Creating new raw query '{}' (uuid: {}) for node '{}' (uuid: {})",
67            layout.label,
68            layout.uuid,
69            self.source.label,
70            self.source.uuid
71        );
72
73        Ok(RawQuery::new(
74            tx,
75            rx,
76            self.clock.clone(),
77            self.source.clone(),
78            layout,
79        ))
80    }
81
82    /// Creates a new query, this query has type information
83    pub async fn with<T: ArrowMessage, F: ArrowMessage>(
84        &mut self,
85        query: impl Into<String>,
86    ) -> Result<Query<T, F>> {
87        let (tx, rx, layout) = self.compute(query).await?;
88
89        tracing::debug!(
90            "Creating new query '{}' (uuid: {}) for node '{}' (uuid: {})",
91            layout.label,
92            layout.uuid,
93            self.source.label,
94            self.source.uuid
95        );
96
97        Ok(Query::new(
98            tx,
99            rx,
100            self.clock.clone(),
101            self.source.clone(),
102            layout,
103        ))
104    }
105}