typed_clickhouse/
watch.rs1use std::fmt::Write;
2
3use serde::Deserialize;
4use sha1::{Digest, Sha1};
5
6use crate::{
7 error::Result,
8 introspection::Reflection,
9 query,
10 sql_builder::{Bind, SqlBuilder},
11 Client,
12};
13
14pub struct Watch {
15 client: Client,
16 sql: SqlBuilder,
17 limit: Option<usize>,
18}
19
20impl Watch {
21 pub(crate) fn new(client: &Client, template: &str) -> Self {
22 Self {
23 client: client.clone(),
24 sql: SqlBuilder::new(template),
25 limit: None,
26 }
27 }
28
29 pub fn bind(mut self, value: impl Bind) -> Self {
30 self.sql.bind_arg(value);
31 self
32 }
33
34 pub fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
37 self.limit = limit.into();
38 self
39 }
40
41 pub fn rows<T: Reflection>(self) -> Result<RowCursor<T>> {
42 Ok(RowCursor(self.cursor(false)?))
43 }
44
45 pub fn events(self) -> Result<EventCursor> {
46 Ok(EventCursor(self.cursor(true)?))
47 }
48
49 fn cursor<T: Reflection>(self, only_events: bool) -> Result<RawCursor<T>> {
52 let sql = self.sql.finish()?;
53 let (sql, view) = if is_table_name(&sql) {
54 (None, sql)
55 } else {
56 let view = make_live_view_name(&sql);
57 (Some(sql), view)
58 };
59
60 Ok(RawCursor::Preparing {
61 client: self.client,
62 sql,
63 view,
64 limit: self.limit,
65 only_events,
66 })
67 }
68}
69
70pub type Version = u64; pub struct EventCursor(RawCursor<()>);
73
74impl EventCursor {
75 pub async fn next(&mut self) -> Result<Option<Version>> {
76 Ok(self.0.next().await?.map(|(_, version)| version))
77 }
78}
79
80pub struct RowCursor<T>(RawCursor<T>);
81
82impl<T> RowCursor<T> {
83 pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<(Version, T)>>
84 where
85 T: Deserialize<'b> + Reflection,
86 {
87 Ok(self.0.next().await?.map(|(row, version)| (version, row)))
88 }
89}
90
91enum RawCursor<T> {
92 Preparing {
93 client: Client,
94 sql: Option<String>,
95 view: String,
96 limit: Option<usize>,
97 only_events: bool,
98 },
99 Fetching(query::RowCursor<(T, Version)>),
100}
101
102impl<T> RawCursor<T> {
103 pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<(T, Version)>>
104 where
105 T: Deserialize<'b> + Reflection,
106 {
107 if let RawCursor::Preparing {
108 client,
109 sql,
110 view,
111 limit,
112 only_events,
113 } = self
114 {
115 if let Some(sql) = sql {
116 let create_sql = format!(
117 "CREATE LIVE VIEW IF NOT EXISTS {} WITH TIMEOUT AS {}",
118 view, sql
119 );
120 client.query(&create_sql).execute().await?;
121 }
122
123 let events = if *only_events { " EVENTS" } else { "" };
124 let watch_sql = match limit {
125 Some(limit) => format!("WATCH {}{} LIMIT {}", view, events, limit),
126 None => format!("WATCH {}{}", view, events),
127 };
128
129 let cursor = client.query(&watch_sql).rows()?;
130 *self = RawCursor::Fetching(cursor);
131 }
132
133 match self {
134 RawCursor::Preparing { .. } => unreachable!(),
135 RawCursor::Fetching(cursor) => Ok(cursor.next().await?),
136 }
137 }
138}
139
140fn is_table_name(sql: &str) -> bool {
141 sql.split_ascii_whitespace().take(2).count() == 1
143}
144
145fn make_live_view_name(sql: &str) -> String {
146 let mut hasher = Sha1::new();
147 hasher.update(sql.as_bytes());
148 let result = hasher.finalize();
149
150 let mut name = String::with_capacity(40);
151 for word in &result[..] {
152 let _ = write!(&mut name, "{:02x}", word);
153 }
154
155 format!("lv_{}", name)
156}
157
158#[test]
159fn it_makes_live_view_name() {
160 let a = make_live_view_name("SELECT 1");
161 let b = make_live_view_name("SELECT 2");
162
163 assert_ne!(a, b);
164 assert_eq!(a.len(), 3 + 40);
165 assert_eq!(b.len(), 3 + 40);
166}