typed_clickhouse/
watch.rs

1use std::fmt::Write;
2
3use serde::Deserialize;
4use sha1::{Digest, Sha1};
5
6use crate::{
7    error::Result,
8    introspection::Reflection,
9    query,
10    sql_builder::{Bind, SqlBuilder},
11    Client,
12};
13
14pub struct Watch {
15    client: Client,
16    sql: SqlBuilder,
17    limit: Option<usize>,
18}
19
20impl Watch {
21    pub(crate) fn new(client: &Client, template: &str) -> Self {
22        Self {
23            client: client.clone(),
24            sql: SqlBuilder::new(template),
25            limit: None,
26        }
27    }
28
29    pub fn bind(mut self, value: impl Bind) -> Self {
30        self.sql.bind_arg(value);
31        self
32    }
33
34    // TODO: `timeout()`.
35
36    pub fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
37        self.limit = limit.into();
38        self
39    }
40
41    pub fn rows<T: Reflection>(self) -> Result<RowCursor<T>> {
42        Ok(RowCursor(self.cursor(false)?))
43    }
44
45    pub fn events(self) -> Result<EventCursor> {
46        Ok(EventCursor(self.cursor(true)?))
47    }
48
49    // TODO: `groups()` for `(Version, &[T])`.
50
51    fn cursor<T: Reflection>(self, only_events: bool) -> Result<RawCursor<T>> {
52        let sql = self.sql.finish()?;
53        let (sql, view) = if is_table_name(&sql) {
54            (None, sql)
55        } else {
56            let view = make_live_view_name(&sql);
57            (Some(sql), view)
58        };
59
60        Ok(RawCursor::Preparing {
61            client: self.client,
62            sql,
63            view,
64            limit: self.limit,
65            only_events,
66        })
67    }
68}
69
70pub type Version = u64; // TODO: NonZeroU64
71
72pub struct EventCursor(RawCursor<()>);
73
74impl EventCursor {
75    pub async fn next(&mut self) -> Result<Option<Version>> {
76        Ok(self.0.next().await?.map(|(_, version)| version))
77    }
78}
79
80pub struct RowCursor<T>(RawCursor<T>);
81
82impl<T> RowCursor<T> {
83    pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<(Version, T)>>
84    where
85        T: Deserialize<'b> + Reflection,
86    {
87        Ok(self.0.next().await?.map(|(row, version)| (version, row)))
88    }
89}
90
91enum RawCursor<T> {
92    Preparing {
93        client: Client,
94        sql: Option<String>,
95        view: String,
96        limit: Option<usize>,
97        only_events: bool,
98    },
99    Fetching(query::RowCursor<(T, Version)>),
100}
101
102impl<T> RawCursor<T> {
103    pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<(T, Version)>>
104    where
105        T: Deserialize<'b> + Reflection,
106    {
107        if let RawCursor::Preparing {
108            client,
109            sql,
110            view,
111            limit,
112            only_events,
113        } = self
114        {
115            if let Some(sql) = sql {
116                let create_sql = format!(
117                    "CREATE LIVE VIEW IF NOT EXISTS {} WITH TIMEOUT AS {}",
118                    view, sql
119                );
120                client.query(&create_sql).execute().await?;
121            }
122
123            let events = if *only_events { " EVENTS" } else { "" };
124            let watch_sql = match limit {
125                Some(limit) => format!("WATCH {}{} LIMIT {}", view, events, limit),
126                None => format!("WATCH {}{}", view, events),
127            };
128
129            let cursor = client.query(&watch_sql).rows()?;
130            *self = RawCursor::Fetching(cursor);
131        }
132
133        match self {
134            RawCursor::Preparing { .. } => unreachable!(),
135            RawCursor::Fetching(cursor) => Ok(cursor.next().await?),
136        }
137    }
138}
139
140fn is_table_name(sql: &str) -> bool {
141    // TODO: support quoted identifiers.
142    sql.split_ascii_whitespace().take(2).count() == 1
143}
144
145fn make_live_view_name(sql: &str) -> String {
146    let mut hasher = Sha1::new();
147    hasher.update(sql.as_bytes());
148    let result = hasher.finalize();
149
150    let mut name = String::with_capacity(40);
151    for word in &result[..] {
152        let _ = write!(&mut name, "{:02x}", word);
153    }
154
155    format!("lv_{}", name)
156}
157
158#[test]
159fn it_makes_live_view_name() {
160    let a = make_live_view_name("SELECT 1");
161    let b = make_live_view_name("SELECT 2");
162
163    assert_ne!(a, b);
164    assert_eq!(a.len(), 3 + 40);
165    assert_eq!(b.len(), 3 + 40);
166}