1use diesel::backend::Backend;
4use diesel::expression::functions::define_sql_function;
5use diesel::expression::{AppearsOnTable, Expression, SelectableExpression, ValidGrouping};
6use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId};
7use diesel::query_dsl::RunQueryDsl;
8use diesel::result::{Error, QueryResult};
9use diesel::sql_types::{BigInt, SingleValue, SqlType};
10
11use crate::backend::ClickHouse;
12
13define_sql_function! {
16 #[sql_name = "row_number"]
18 fn row_number() -> BigInt;
19}
20
21define_sql_function! {
22 #[sql_name = "rank"]
24 fn rank() -> BigInt;
25}
26
27define_sql_function! {
28 #[sql_name = "dense_rank"]
30 fn dense_rank() -> BigInt;
31}
32
33define_sql_function! {
34 #[sql_name = "lag"]
36 fn lag<T: SqlType + SingleValue>(expr: T) -> T;
37}
38
39define_sql_function! {
40 #[sql_name = "lead"]
42 fn lead<T: SqlType + SingleValue>(expr: T) -> T;
43}
44
45define_sql_function! {
46 #[sql_name = "lagInFrame"]
48 fn lag_in_frame<T: SqlType + SingleValue>(expr: T, offset: BigInt, default: T) -> T;
49}
50
51define_sql_function! {
52 #[sql_name = "leadInFrame"]
54 fn lead_in_frame<T: SqlType + SingleValue>(expr: T, offset: BigInt, default: T) -> T;
55}
56
57define_sql_function! {
58 #[sql_name = "first_value"]
60 fn first_value<T: SqlType + SingleValue>(expr: T) -> T;
61}
62
63define_sql_function! {
64 #[sql_name = "last_value"]
66 fn last_value<T: SqlType + SingleValue>(expr: T) -> T;
67}
68
69pub fn partition_by<Expr>(expr: Expr) -> WindowSpec<WindowPartition<Expr>>
71where
72 Expr: Expression,
73{
74 WindowSpec {
75 partition: WindowPartition(expr),
76 order: NoWindowOrder,
77 frame: NoWindowFrame,
78 }
79}
80
81pub fn window_order_by<Expr>(expr: Expr) -> WindowSpec<NoWindowPartition, WindowOrder<Expr>>
83where
84 Expr: Expression,
85{
86 WindowSpec {
87 partition: NoWindowPartition,
88 order: WindowOrder(expr),
89 frame: NoWindowFrame,
90 }
91}
92
93pub fn qualify<Q, Predicate>(query: Q, predicate: Predicate) -> QualifyQuery<Q, Predicate>
95where
96 Predicate: Expression,
97{
98 QualifyQuery { query, predicate }
99}
100
101pub fn window<Q, Spec>(
103 query: Q,
104 name: impl Into<String>,
105 spec: Spec,
106) -> WindowQuery<Q, WindowBinding<NoWindowBindings, Spec>> {
107 WindowQuery {
108 query,
109 bindings: WindowBinding {
110 tail: NoWindowBindings,
111 name: name.into(),
112 spec,
113 },
114 }
115}
116
117#[derive(Debug, Clone, Copy)]
119pub struct Over<Expr, Spec> {
120 expr: Expr,
121 spec: Spec,
122}
123
124#[derive(Debug, Clone)]
126pub struct OverWindow<Expr> {
127 expr: Expr,
128 name: String,
129}
130
131pub trait OverDsl: Expression + Sized {
133 fn over_ch<Spec>(self, spec: Spec) -> Over<Self, Spec> {
138 Over { expr: self, spec }
139 }
140
141 fn over_window(self, name: impl Into<String>) -> OverWindow<Self> {
143 OverWindow {
144 expr: self,
145 name: name.into(),
146 }
147 }
148}
149
150impl<T> OverDsl for T where T: Expression {}
151
152#[derive(Debug, Clone, Copy)]
154pub struct WindowSpec<Partition = NoWindowPartition, Order = NoWindowOrder, Frame = NoWindowFrame> {
155 partition: Partition,
156 order: Order,
157 frame: Frame,
158}
159
160#[derive(Debug, Clone, Copy, Default)]
162pub struct NoWindowPartition;
163
164#[derive(Debug, Clone, Copy)]
166pub struct WindowPartition<Expr>(Expr);
167
168#[derive(Debug, Clone, Copy, Default)]
170pub struct NoWindowOrder;
171
172#[derive(Debug, Clone, Copy)]
174pub struct WindowOrder<Expr>(Expr);
175
176#[derive(Debug, Clone, Copy, Default)]
178pub struct NoWindowFrame;
179
180#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
182pub enum WindowFrameUnits {
183 Rows,
185 Range,
187}
188
189#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
191pub enum WindowFrameBound {
192 UnboundedPreceding,
194 Preceding(u64),
196 CurrentRow,
198 Following(u64),
200 UnboundedFollowing,
202}
203
204#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
206pub struct WindowFrame {
207 units: WindowFrameUnits,
208 start: WindowFrameBound,
209 end: WindowFrameBound,
210}
211
212pub type RowsBetweenUnboundedPrecedingAndCurrentRow = WindowFrame;
214
215impl WindowFrameBound {
216 pub fn preceding(offset: u64) -> Self {
218 Self::Preceding(offset)
219 }
220
221 pub fn following(offset: u64) -> Self {
223 Self::Following(offset)
224 }
225}
226
227impl<Partition, Order, Frame> WindowSpec<Partition, Order, Frame> {
228 pub fn order_by<Expr>(self, expr: Expr) -> WindowSpec<Partition, WindowOrder<Expr>, Frame>
230 where
231 Expr: Expression,
232 {
233 WindowSpec {
234 partition: self.partition,
235 order: WindowOrder(expr),
236 frame: self.frame,
237 }
238 }
239
240 pub fn rows_between(
242 self,
243 start: WindowFrameBound,
244 end: WindowFrameBound,
245 ) -> WindowSpec<Partition, Order, WindowFrame> {
246 self.with_frame(WindowFrame {
247 units: WindowFrameUnits::Rows,
248 start,
249 end,
250 })
251 }
252
253 pub fn range_between(
255 self,
256 start: WindowFrameBound,
257 end: WindowFrameBound,
258 ) -> WindowSpec<Partition, Order, WindowFrame> {
259 self.with_frame(WindowFrame {
260 units: WindowFrameUnits::Range,
261 start,
262 end,
263 })
264 }
265
266 pub fn rows_between_unbounded_preceding_and_current_row(
268 self,
269 ) -> WindowSpec<Partition, Order, WindowFrame> {
270 self.rows_between(
271 WindowFrameBound::UnboundedPreceding,
272 WindowFrameBound::CurrentRow,
273 )
274 }
275
276 pub fn rows_between_unbounded_preceding_and_unbounded_following(
278 self,
279 ) -> WindowSpec<Partition, Order, WindowFrame> {
280 self.rows_between(
281 WindowFrameBound::UnboundedPreceding,
282 WindowFrameBound::UnboundedFollowing,
283 )
284 }
285
286 pub fn rows_between_preceding_and_current_row(
288 self,
289 preceding: u64,
290 ) -> WindowSpec<Partition, Order, WindowFrame> {
291 self.rows_between(
292 WindowFrameBound::Preceding(preceding),
293 WindowFrameBound::CurrentRow,
294 )
295 }
296
297 pub fn rows_between_preceding_and_following(
299 self,
300 preceding: u64,
301 following: u64,
302 ) -> WindowSpec<Partition, Order, WindowFrame> {
303 self.rows_between(
304 WindowFrameBound::Preceding(preceding),
305 WindowFrameBound::Following(following),
306 )
307 }
308
309 pub fn rows_between_current_row_and_following(
311 self,
312 following: u64,
313 ) -> WindowSpec<Partition, Order, WindowFrame> {
314 self.rows_between(
315 WindowFrameBound::CurrentRow,
316 WindowFrameBound::Following(following),
317 )
318 }
319
320 pub fn range_between_unbounded_preceding_and_current_row(
322 self,
323 ) -> WindowSpec<Partition, Order, WindowFrame> {
324 self.range_between(
325 WindowFrameBound::UnboundedPreceding,
326 WindowFrameBound::CurrentRow,
327 )
328 }
329
330 pub fn range_between_preceding_and_current_row(
332 self,
333 preceding: u64,
334 ) -> WindowSpec<Partition, Order, WindowFrame> {
335 self.range_between(
336 WindowFrameBound::Preceding(preceding),
337 WindowFrameBound::CurrentRow,
338 )
339 }
340
341 fn with_frame<NewFrame>(self, frame: NewFrame) -> WindowSpec<Partition, Order, NewFrame> {
342 WindowSpec {
343 partition: self.partition,
344 order: self.order,
345 frame,
346 }
347 }
348}
349
350#[derive(Debug, Clone, Copy)]
352pub struct QualifyQuery<Q, Predicate> {
353 query: Q,
354 predicate: Predicate,
355}
356
357#[derive(Debug, Clone, Copy)]
359pub struct WindowQuery<Q, Bindings> {
360 query: Q,
361 bindings: Bindings,
362}
363
364#[derive(Debug, Clone, Copy, Default)]
366pub struct NoWindowBindings;
367
368#[derive(Debug, Clone)]
370pub struct WindowBinding<Tail, Spec> {
371 tail: Tail,
372 name: String,
373 spec: Spec,
374}
375
376impl<Q, Bindings> WindowQuery<Q, Bindings> {
377 pub fn and_window<Spec>(
379 self,
380 name: impl Into<String>,
381 spec: Spec,
382 ) -> WindowQuery<Q, WindowBinding<Bindings, Spec>> {
383 WindowQuery {
384 query: self.query,
385 bindings: WindowBinding {
386 tail: self.bindings,
387 name: name.into(),
388 spec,
389 },
390 }
391 }
392}
393
394impl<Expr, Spec> Expression for Over<Expr, Spec>
395where
396 Expr: Expression,
397{
398 type SqlType = Expr::SqlType;
399}
400
401impl<Expr> Expression for OverWindow<Expr>
402where
403 Expr: Expression,
404{
405 type SqlType = Expr::SqlType;
406}
407
408impl<Expr, Spec, GB> ValidGrouping<GB> for Over<Expr, Spec>
409where
410 Expr: ValidGrouping<GB>,
411{
412 type IsAggregate = Expr::IsAggregate;
413}
414
415impl<Expr, GB> ValidGrouping<GB> for OverWindow<Expr>
416where
417 Expr: ValidGrouping<GB>,
418{
419 type IsAggregate = Expr::IsAggregate;
420}
421
422impl<Expr, Spec, QS> AppearsOnTable<QS> for Over<Expr, Spec>
423where
424 Expr: AppearsOnTable<QS>,
425 Self: Expression,
426{
427}
428
429impl<Expr, QS> AppearsOnTable<QS> for OverWindow<Expr>
430where
431 Expr: AppearsOnTable<QS>,
432 Self: Expression,
433{
434}
435
436impl<Expr, Spec, QS> SelectableExpression<QS> for Over<Expr, Spec> where Self: AppearsOnTable<QS> {}
437impl<Expr, QS> SelectableExpression<QS> for OverWindow<Expr> where Self: AppearsOnTable<QS> {}
438
439impl<Expr, Spec> QueryId for Over<Expr, Spec>
440where
441 Expr: QueryId,
442{
443 type QueryId = Over<Expr::QueryId, ()>;
444 const HAS_STATIC_QUERY_ID: bool = false;
445}
446
447impl<Expr> QueryId for OverWindow<Expr>
448where
449 Expr: QueryId,
450{
451 type QueryId = OverWindow<Expr::QueryId>;
452 const HAS_STATIC_QUERY_ID: bool = false;
453}
454
455impl<Q, Predicate> Query for QualifyQuery<Q, Predicate>
456where
457 Q: Query,
458{
459 type SqlType = Q::SqlType;
460}
461
462impl<Q, Predicate, Conn> RunQueryDsl<Conn> for QualifyQuery<Q, Predicate> {}
463
464impl<Q, Predicate> QueryId for QualifyQuery<Q, Predicate>
465where
466 Q: QueryId,
467{
468 type QueryId = QualifyQuery<Q::QueryId, ()>;
469 const HAS_STATIC_QUERY_ID: bool = false;
470}
471
472impl<Q, Bindings> Query for WindowQuery<Q, Bindings>
473where
474 Q: Query,
475{
476 type SqlType = Q::SqlType;
477}
478
479impl<Q, Bindings, Conn> RunQueryDsl<Conn> for WindowQuery<Q, Bindings> {}
480
481impl<Q, Bindings> QueryId for WindowQuery<Q, Bindings>
482where
483 Q: QueryId,
484{
485 type QueryId = WindowQuery<Q::QueryId, ()>;
486 const HAS_STATIC_QUERY_ID: bool = false;
487}
488
489impl<Expr, Spec, DB> QueryFragment<DB> for Over<Expr, Spec>
490where
491 DB: Backend,
492 Expr: QueryFragment<DB>,
493 Spec: QueryFragment<DB>,
494{
495 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
496 self.expr.walk_ast(out.reborrow())?;
497 out.push_sql(" OVER (");
498 self.spec.walk_ast(out.reborrow())?;
499 out.push_sql(")");
500 Ok(())
501 }
502}
503
504impl<Expr> QueryFragment<ClickHouse> for OverWindow<Expr>
505where
506 Expr: QueryFragment<ClickHouse>,
507{
508 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
509 self.expr.walk_ast(out.reborrow())?;
510 validate_bare_identifier(&self.name, "window name")?;
511 out.push_sql(" OVER ");
512 out.push_identifier(&self.name)?;
513 Ok(())
514 }
515}
516
517trait WindowSpecPart {
518 fn is_empty(&self) -> bool;
519 fn walk_part<'b>(&'b self, out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()>;
520}
521
522impl WindowSpecPart for NoWindowPartition {
523 fn is_empty(&self) -> bool {
524 true
525 }
526
527 fn walk_part<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
528 Ok(())
529 }
530}
531
532impl<Expr> WindowSpecPart for WindowPartition<Expr>
533where
534 Expr: QueryFragment<ClickHouse>,
535{
536 fn is_empty(&self) -> bool {
537 false
538 }
539
540 fn walk_part<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
541 out.push_sql("PARTITION BY ");
542 self.0.walk_ast(out.reborrow())?;
543 Ok(())
544 }
545}
546
547impl WindowSpecPart for NoWindowOrder {
548 fn is_empty(&self) -> bool {
549 true
550 }
551
552 fn walk_part<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
553 Ok(())
554 }
555}
556
557impl<Expr> WindowSpecPart for WindowOrder<Expr>
558where
559 Expr: QueryFragment<ClickHouse>,
560{
561 fn is_empty(&self) -> bool {
562 false
563 }
564
565 fn walk_part<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
566 out.push_sql("ORDER BY ");
567 self.0.walk_ast(out.reborrow())?;
568 Ok(())
569 }
570}
571
572impl WindowSpecPart for NoWindowFrame {
573 fn is_empty(&self) -> bool {
574 true
575 }
576
577 fn walk_part<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
578 Ok(())
579 }
580}
581
582impl WindowSpecPart for WindowFrame {
583 fn is_empty(&self) -> bool {
584 false
585 }
586
587 fn walk_part<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
588 out.push_sql(match self.units {
589 WindowFrameUnits::Rows => "ROWS BETWEEN ",
590 WindowFrameUnits::Range => "RANGE BETWEEN ",
591 });
592 push_frame_bound(&mut out, self.start);
593 out.push_sql(" AND ");
594 push_frame_bound(&mut out, self.end);
595 Ok(())
596 }
597}
598
599fn push_frame_bound<DB>(out: &mut AstPass<'_, '_, DB>, bound: WindowFrameBound)
600where
601 DB: Backend,
602{
603 match bound {
604 WindowFrameBound::UnboundedPreceding => out.push_sql("UNBOUNDED PRECEDING"),
605 WindowFrameBound::Preceding(offset) => {
606 out.push_sql(&offset.to_string());
607 out.push_sql(" PRECEDING");
608 }
609 WindowFrameBound::CurrentRow => out.push_sql("CURRENT ROW"),
610 WindowFrameBound::Following(offset) => {
611 out.push_sql(&offset.to_string());
612 out.push_sql(" FOLLOWING");
613 }
614 WindowFrameBound::UnboundedFollowing => out.push_sql("UNBOUNDED FOLLOWING"),
615 }
616}
617
618impl<Partition, Order, Frame> QueryFragment<ClickHouse> for WindowSpec<Partition, Order, Frame>
619where
620 Partition: WindowSpecPart,
621 Order: WindowSpecPart,
622 Frame: WindowSpecPart,
623{
624 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
625 self.partition.walk_part(out.reborrow())?;
626 if !self.partition.is_empty() && !self.order.is_empty() {
627 out.push_sql(" ");
628 }
629 self.order.walk_part(out.reborrow())?;
630 if (!self.partition.is_empty() || !self.order.is_empty()) && !self.frame.is_empty() {
631 out.push_sql(" ");
632 }
633 self.frame.walk_part(out.reborrow())?;
634 Ok(())
635 }
636}
637
638impl<Q, Predicate> QueryFragment<ClickHouse> for QualifyQuery<Q, Predicate>
639where
640 Q: QueryFragment<ClickHouse>,
641 Predicate: QueryFragment<ClickHouse>,
642{
643 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
644 self.query.walk_ast(out.reborrow())?;
645 out.push_sql(" QUALIFY ");
646 self.predicate.walk_ast(out.reborrow())?;
647 Ok(())
648 }
649}
650
651trait WindowBindings {
652 fn is_empty(&self) -> bool;
653 fn walk_bindings<'b>(&'b self, out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()>;
654}
655
656impl WindowBindings for NoWindowBindings {
657 fn is_empty(&self) -> bool {
658 true
659 }
660
661 fn walk_bindings<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
662 Ok(())
663 }
664}
665
666impl<Tail, Spec> WindowBindings for WindowBinding<Tail, Spec>
667where
668 Tail: WindowBindings,
669 Spec: QueryFragment<ClickHouse>,
670{
671 fn is_empty(&self) -> bool {
672 false
673 }
674
675 fn walk_bindings<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
676 self.tail.walk_bindings(out.reborrow())?;
677 if !self.tail.is_empty() {
678 out.push_sql(", ");
679 }
680 validate_bare_identifier(&self.name, "window name")?;
681 out.push_identifier(&self.name)?;
682 out.push_sql(" AS (");
683 self.spec.walk_ast(out.reborrow())?;
684 out.push_sql(")");
685 Ok(())
686 }
687}
688
689impl<Q, Bindings> QueryFragment<ClickHouse> for WindowQuery<Q, Bindings>
690where
691 Q: QueryFragment<ClickHouse>,
692 Bindings: WindowBindings,
693{
694 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
695 self.query.walk_ast(out.reborrow())?;
696 if !self.bindings.is_empty() {
697 out.push_sql(" WINDOW ");
698 self.bindings.walk_bindings(out.reborrow())?;
699 }
700 Ok(())
701 }
702}
703
704fn validate_bare_identifier(value: &str, kind: &str) -> QueryResult<()> {
705 let mut chars = value.chars();
706 let Some(first) = chars.next() else {
707 return Err(Error::QueryBuilderError(
708 format!("empty ClickHouse {kind}").into(),
709 ));
710 };
711 if !(first == '_' || first.is_ascii_alphabetic()) {
712 return Err(Error::QueryBuilderError(
713 format!("invalid ClickHouse {kind}: {value:?}").into(),
714 ));
715 }
716 if chars.any(|ch| !(ch == '_' || ch.is_ascii_alphanumeric())) {
717 return Err(Error::QueryBuilderError(
718 format!("invalid ClickHouse {kind}: {value:?}").into(),
719 ));
720 }
721 Ok(())
722}