Skip to main content

diesel_clickhouse/
window.rs

1//! ClickHouse window functions, inline window specs, named windows, and QUALIFY.
2
3use diesel::backend::Backend;
4use diesel::expression::functions::define_sql_function;
5use diesel::expression::{AppearsOnTable, Expression, SelectableExpression, ValidGrouping};
6use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId};
7use diesel::query_dsl::RunQueryDsl;
8use diesel::result::{Error, QueryResult};
9use diesel::sql_types::{BigInt, SingleValue, SqlType};
10
11use crate::backend::ClickHouse;
12
13// Window-capable functions. They are ordinary function-call AST nodes until a
14// caller wraps them with [`OverDsl::over_ch`] or [`OverDsl::over_window`].
15define_sql_function! {
16    /// `row_number()`.
17    #[sql_name = "row_number"]
18    fn row_number() -> BigInt;
19}
20
21define_sql_function! {
22    /// `rank()`.
23    #[sql_name = "rank"]
24    fn rank() -> BigInt;
25}
26
27define_sql_function! {
28    /// `dense_rank()`.
29    #[sql_name = "dense_rank"]
30    fn dense_rank() -> BigInt;
31}
32
33define_sql_function! {
34    /// `lag(expr)`.
35    #[sql_name = "lag"]
36    fn lag<T: SqlType + SingleValue>(expr: T) -> T;
37}
38
39define_sql_function! {
40    /// `lead(expr)`.
41    #[sql_name = "lead"]
42    fn lead<T: SqlType + SingleValue>(expr: T) -> T;
43}
44
45define_sql_function! {
46    /// `lagInFrame(expr, offset, default)`.
47    #[sql_name = "lagInFrame"]
48    fn lag_in_frame<T: SqlType + SingleValue>(expr: T, offset: BigInt, default: T) -> T;
49}
50
51define_sql_function! {
52    /// `leadInFrame(expr, offset, default)`.
53    #[sql_name = "leadInFrame"]
54    fn lead_in_frame<T: SqlType + SingleValue>(expr: T, offset: BigInt, default: T) -> T;
55}
56
57define_sql_function! {
58    /// `first_value(expr)`.
59    #[sql_name = "first_value"]
60    fn first_value<T: SqlType + SingleValue>(expr: T) -> T;
61}
62
63define_sql_function! {
64    /// `last_value(expr)`.
65    #[sql_name = "last_value"]
66    fn last_value<T: SqlType + SingleValue>(expr: T) -> T;
67}
68
69/// Build a window spec with `PARTITION BY expr`.
70pub fn partition_by<Expr>(expr: Expr) -> WindowSpec<WindowPartition<Expr>>
71where
72    Expr: Expression,
73{
74    WindowSpec {
75        partition: WindowPartition(expr),
76        order: NoWindowOrder,
77        frame: NoWindowFrame,
78    }
79}
80
81/// Build a window spec with `ORDER BY expr` and no partition key.
82pub fn window_order_by<Expr>(expr: Expr) -> WindowSpec<NoWindowPartition, WindowOrder<Expr>>
83where
84    Expr: Expression,
85{
86    WindowSpec {
87        partition: NoWindowPartition,
88        order: WindowOrder(expr),
89        frame: NoWindowFrame,
90    }
91}
92
93/// Append `QUALIFY predicate` to a query.
94pub fn qualify<Q, Predicate>(query: Q, predicate: Predicate) -> QualifyQuery<Q, Predicate>
95where
96    Predicate: Expression,
97{
98    QualifyQuery { query, predicate }
99}
100
101/// Append `WINDOW name AS (spec)` to a query.
102pub fn window<Q, Spec>(
103    query: Q,
104    name: impl Into<String>,
105    spec: Spec,
106) -> WindowQuery<Q, WindowBinding<NoWindowBindings, Spec>> {
107    WindowQuery {
108        query,
109        bindings: WindowBinding {
110            tail: NoWindowBindings,
111            name: name.into(),
112            spec,
113        },
114    }
115}
116
117/// `expr OVER (...)` wrapper.
118#[derive(Debug, Clone, Copy)]
119pub struct Over<Expr, Spec> {
120    expr: Expr,
121    spec: Spec,
122}
123
124/// `expr OVER window_name` wrapper.
125#[derive(Debug, Clone)]
126pub struct OverWindow<Expr> {
127    expr: Expr,
128    name: String,
129}
130
131/// Fluent `.over_ch(spec)` and `.over_window(name)` helpers for window functions.
132pub trait OverDsl: Expression + Sized {
133    /// Render `self OVER (spec)`.
134    ///
135    /// The `_ch` suffix avoids a name collision with Diesel 2.3's no-argument
136    /// `.over()` helper while keeping ClickHouse window specifications fluent.
137    fn over_ch<Spec>(self, spec: Spec) -> Over<Self, Spec> {
138        Over { expr: self, spec }
139    }
140
141    /// Render `self OVER window_name`.
142    fn over_window(self, name: impl Into<String>) -> OverWindow<Self> {
143        OverWindow {
144            expr: self,
145            name: name.into(),
146        }
147    }
148}
149
150impl<T> OverDsl for T where T: Expression {}
151
152/// Inline window specification.
153#[derive(Debug, Clone, Copy)]
154pub struct WindowSpec<Partition = NoWindowPartition, Order = NoWindowOrder, Frame = NoWindowFrame> {
155    partition: Partition,
156    order: Order,
157    frame: Frame,
158}
159
160/// Missing `PARTITION BY` part.
161#[derive(Debug, Clone, Copy, Default)]
162pub struct NoWindowPartition;
163
164/// `PARTITION BY expr` part.
165#[derive(Debug, Clone, Copy)]
166pub struct WindowPartition<Expr>(Expr);
167
168/// Missing `ORDER BY` part.
169#[derive(Debug, Clone, Copy, Default)]
170pub struct NoWindowOrder;
171
172/// `ORDER BY expr` part.
173#[derive(Debug, Clone, Copy)]
174pub struct WindowOrder<Expr>(Expr);
175
176/// Missing frame clause.
177#[derive(Debug, Clone, Copy, Default)]
178pub struct NoWindowFrame;
179
180/// Window frame units.
181#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
182pub enum WindowFrameUnits {
183    /// `ROWS BETWEEN ...` counts physical rows relative to the current row.
184    Rows,
185    /// `RANGE BETWEEN ...` groups rows by distance in the ordering value.
186    Range,
187}
188
189/// One boundary in a ClickHouse window frame.
190#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
191pub enum WindowFrameBound {
192    /// `UNBOUNDED PRECEDING`.
193    UnboundedPreceding,
194    /// `n PRECEDING`.
195    Preceding(u64),
196    /// `CURRENT ROW`.
197    CurrentRow,
198    /// `n FOLLOWING`.
199    Following(u64),
200    /// `UNBOUNDED FOLLOWING`.
201    UnboundedFollowing,
202}
203
204/// `ROWS` or `RANGE` frame clause for a window specification.
205#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
206pub struct WindowFrame {
207    units: WindowFrameUnits,
208    start: WindowFrameBound,
209    end: WindowFrameBound,
210}
211
212/// Backwards-compatible name for the original frame helper return type.
213pub type RowsBetweenUnboundedPrecedingAndCurrentRow = WindowFrame;
214
215impl WindowFrameBound {
216    /// Build `n PRECEDING`.
217    pub fn preceding(offset: u64) -> Self {
218        Self::Preceding(offset)
219    }
220
221    /// Build `n FOLLOWING`.
222    pub fn following(offset: u64) -> Self {
223        Self::Following(offset)
224    }
225}
226
227impl<Partition, Order, Frame> WindowSpec<Partition, Order, Frame> {
228    /// Add or replace the `ORDER BY` expression.
229    pub fn order_by<Expr>(self, expr: Expr) -> WindowSpec<Partition, WindowOrder<Expr>, Frame>
230    where
231        Expr: Expression,
232    {
233        WindowSpec {
234            partition: self.partition,
235            order: WindowOrder(expr),
236            frame: self.frame,
237        }
238    }
239
240    /// Add a `ROWS BETWEEN start AND end` frame.
241    pub fn rows_between(
242        self,
243        start: WindowFrameBound,
244        end: WindowFrameBound,
245    ) -> WindowSpec<Partition, Order, WindowFrame> {
246        self.with_frame(WindowFrame {
247            units: WindowFrameUnits::Rows,
248            start,
249            end,
250        })
251    }
252
253    /// Add a `RANGE BETWEEN start AND end` frame.
254    pub fn range_between(
255        self,
256        start: WindowFrameBound,
257        end: WindowFrameBound,
258    ) -> WindowSpec<Partition, Order, WindowFrame> {
259        self.with_frame(WindowFrame {
260            units: WindowFrameUnits::Range,
261            start,
262            end,
263        })
264    }
265
266    /// Add `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`.
267    pub fn rows_between_unbounded_preceding_and_current_row(
268        self,
269    ) -> WindowSpec<Partition, Order, WindowFrame> {
270        self.rows_between(
271            WindowFrameBound::UnboundedPreceding,
272            WindowFrameBound::CurrentRow,
273        )
274    }
275
276    /// Add `ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`.
277    pub fn rows_between_unbounded_preceding_and_unbounded_following(
278        self,
279    ) -> WindowSpec<Partition, Order, WindowFrame> {
280        self.rows_between(
281            WindowFrameBound::UnboundedPreceding,
282            WindowFrameBound::UnboundedFollowing,
283        )
284    }
285
286    /// Add `ROWS BETWEEN n PRECEDING AND CURRENT ROW`.
287    pub fn rows_between_preceding_and_current_row(
288        self,
289        preceding: u64,
290    ) -> WindowSpec<Partition, Order, WindowFrame> {
291        self.rows_between(
292            WindowFrameBound::Preceding(preceding),
293            WindowFrameBound::CurrentRow,
294        )
295    }
296
297    /// Add `ROWS BETWEEN n PRECEDING AND m FOLLOWING`.
298    pub fn rows_between_preceding_and_following(
299        self,
300        preceding: u64,
301        following: u64,
302    ) -> WindowSpec<Partition, Order, WindowFrame> {
303        self.rows_between(
304            WindowFrameBound::Preceding(preceding),
305            WindowFrameBound::Following(following),
306        )
307    }
308
309    /// Add `ROWS BETWEEN CURRENT ROW AND n FOLLOWING`.
310    pub fn rows_between_current_row_and_following(
311        self,
312        following: u64,
313    ) -> WindowSpec<Partition, Order, WindowFrame> {
314        self.rows_between(
315            WindowFrameBound::CurrentRow,
316            WindowFrameBound::Following(following),
317        )
318    }
319
320    /// Add `RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`.
321    pub fn range_between_unbounded_preceding_and_current_row(
322        self,
323    ) -> WindowSpec<Partition, Order, WindowFrame> {
324        self.range_between(
325            WindowFrameBound::UnboundedPreceding,
326            WindowFrameBound::CurrentRow,
327        )
328    }
329
330    /// Add `RANGE BETWEEN n PRECEDING AND CURRENT ROW`.
331    pub fn range_between_preceding_and_current_row(
332        self,
333        preceding: u64,
334    ) -> WindowSpec<Partition, Order, WindowFrame> {
335        self.range_between(
336            WindowFrameBound::Preceding(preceding),
337            WindowFrameBound::CurrentRow,
338        )
339    }
340
341    fn with_frame<NewFrame>(self, frame: NewFrame) -> WindowSpec<Partition, Order, NewFrame> {
342        WindowSpec {
343            partition: self.partition,
344            order: self.order,
345            frame,
346        }
347    }
348}
349
350/// Query wrapper that appends `QUALIFY predicate`.
351#[derive(Debug, Clone, Copy)]
352pub struct QualifyQuery<Q, Predicate> {
353    query: Q,
354    predicate: Predicate,
355}
356
357/// Query wrapper that appends named window definitions.
358#[derive(Debug, Clone, Copy)]
359pub struct WindowQuery<Q, Bindings> {
360    query: Q,
361    bindings: Bindings,
362}
363
364/// Empty named window binding list.
365#[derive(Debug, Clone, Copy, Default)]
366pub struct NoWindowBindings;
367
368/// One named window binding plus previously declared bindings.
369#[derive(Debug, Clone)]
370pub struct WindowBinding<Tail, Spec> {
371    tail: Tail,
372    name: String,
373    spec: Spec,
374}
375
376impl<Q, Bindings> WindowQuery<Q, Bindings> {
377    /// Add another named window definition.
378    pub fn and_window<Spec>(
379        self,
380        name: impl Into<String>,
381        spec: Spec,
382    ) -> WindowQuery<Q, WindowBinding<Bindings, Spec>> {
383        WindowQuery {
384            query: self.query,
385            bindings: WindowBinding {
386                tail: self.bindings,
387                name: name.into(),
388                spec,
389            },
390        }
391    }
392}
393
394impl<Expr, Spec> Expression for Over<Expr, Spec>
395where
396    Expr: Expression,
397{
398    type SqlType = Expr::SqlType;
399}
400
401impl<Expr> Expression for OverWindow<Expr>
402where
403    Expr: Expression,
404{
405    type SqlType = Expr::SqlType;
406}
407
408impl<Expr, Spec, GB> ValidGrouping<GB> for Over<Expr, Spec>
409where
410    Expr: ValidGrouping<GB>,
411{
412    type IsAggregate = Expr::IsAggregate;
413}
414
415impl<Expr, GB> ValidGrouping<GB> for OverWindow<Expr>
416where
417    Expr: ValidGrouping<GB>,
418{
419    type IsAggregate = Expr::IsAggregate;
420}
421
422impl<Expr, Spec, QS> AppearsOnTable<QS> for Over<Expr, Spec>
423where
424    Expr: AppearsOnTable<QS>,
425    Self: Expression,
426{
427}
428
429impl<Expr, QS> AppearsOnTable<QS> for OverWindow<Expr>
430where
431    Expr: AppearsOnTable<QS>,
432    Self: Expression,
433{
434}
435
436impl<Expr, Spec, QS> SelectableExpression<QS> for Over<Expr, Spec> where Self: AppearsOnTable<QS> {}
437impl<Expr, QS> SelectableExpression<QS> for OverWindow<Expr> where Self: AppearsOnTable<QS> {}
438
439impl<Expr, Spec> QueryId for Over<Expr, Spec>
440where
441    Expr: QueryId,
442{
443    type QueryId = Over<Expr::QueryId, ()>;
444    const HAS_STATIC_QUERY_ID: bool = false;
445}
446
447impl<Expr> QueryId for OverWindow<Expr>
448where
449    Expr: QueryId,
450{
451    type QueryId = OverWindow<Expr::QueryId>;
452    const HAS_STATIC_QUERY_ID: bool = false;
453}
454
455impl<Q, Predicate> Query for QualifyQuery<Q, Predicate>
456where
457    Q: Query,
458{
459    type SqlType = Q::SqlType;
460}
461
462impl<Q, Predicate, Conn> RunQueryDsl<Conn> for QualifyQuery<Q, Predicate> {}
463
464impl<Q, Predicate> QueryId for QualifyQuery<Q, Predicate>
465where
466    Q: QueryId,
467{
468    type QueryId = QualifyQuery<Q::QueryId, ()>;
469    const HAS_STATIC_QUERY_ID: bool = false;
470}
471
472impl<Q, Bindings> Query for WindowQuery<Q, Bindings>
473where
474    Q: Query,
475{
476    type SqlType = Q::SqlType;
477}
478
479impl<Q, Bindings, Conn> RunQueryDsl<Conn> for WindowQuery<Q, Bindings> {}
480
481impl<Q, Bindings> QueryId for WindowQuery<Q, Bindings>
482where
483    Q: QueryId,
484{
485    type QueryId = WindowQuery<Q::QueryId, ()>;
486    const HAS_STATIC_QUERY_ID: bool = false;
487}
488
489impl<Expr, Spec, DB> QueryFragment<DB> for Over<Expr, Spec>
490where
491    DB: Backend,
492    Expr: QueryFragment<DB>,
493    Spec: QueryFragment<DB>,
494{
495    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
496        self.expr.walk_ast(out.reborrow())?;
497        out.push_sql(" OVER (");
498        self.spec.walk_ast(out.reborrow())?;
499        out.push_sql(")");
500        Ok(())
501    }
502}
503
504impl<Expr> QueryFragment<ClickHouse> for OverWindow<Expr>
505where
506    Expr: QueryFragment<ClickHouse>,
507{
508    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
509        self.expr.walk_ast(out.reborrow())?;
510        validate_bare_identifier(&self.name, "window name")?;
511        out.push_sql(" OVER ");
512        out.push_identifier(&self.name)?;
513        Ok(())
514    }
515}
516
517trait WindowSpecPart {
518    fn is_empty(&self) -> bool;
519    fn walk_part<'b>(&'b self, out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()>;
520}
521
522impl WindowSpecPart for NoWindowPartition {
523    fn is_empty(&self) -> bool {
524        true
525    }
526
527    fn walk_part<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
528        Ok(())
529    }
530}
531
532impl<Expr> WindowSpecPart for WindowPartition<Expr>
533where
534    Expr: QueryFragment<ClickHouse>,
535{
536    fn is_empty(&self) -> bool {
537        false
538    }
539
540    fn walk_part<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
541        out.push_sql("PARTITION BY ");
542        self.0.walk_ast(out.reborrow())?;
543        Ok(())
544    }
545}
546
547impl WindowSpecPart for NoWindowOrder {
548    fn is_empty(&self) -> bool {
549        true
550    }
551
552    fn walk_part<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
553        Ok(())
554    }
555}
556
557impl<Expr> WindowSpecPart for WindowOrder<Expr>
558where
559    Expr: QueryFragment<ClickHouse>,
560{
561    fn is_empty(&self) -> bool {
562        false
563    }
564
565    fn walk_part<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
566        out.push_sql("ORDER BY ");
567        self.0.walk_ast(out.reborrow())?;
568        Ok(())
569    }
570}
571
572impl WindowSpecPart for NoWindowFrame {
573    fn is_empty(&self) -> bool {
574        true
575    }
576
577    fn walk_part<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
578        Ok(())
579    }
580}
581
582impl WindowSpecPart for WindowFrame {
583    fn is_empty(&self) -> bool {
584        false
585    }
586
587    fn walk_part<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
588        out.push_sql(match self.units {
589            WindowFrameUnits::Rows => "ROWS BETWEEN ",
590            WindowFrameUnits::Range => "RANGE BETWEEN ",
591        });
592        push_frame_bound(&mut out, self.start);
593        out.push_sql(" AND ");
594        push_frame_bound(&mut out, self.end);
595        Ok(())
596    }
597}
598
599fn push_frame_bound<DB>(out: &mut AstPass<'_, '_, DB>, bound: WindowFrameBound)
600where
601    DB: Backend,
602{
603    match bound {
604        WindowFrameBound::UnboundedPreceding => out.push_sql("UNBOUNDED PRECEDING"),
605        WindowFrameBound::Preceding(offset) => {
606            out.push_sql(&offset.to_string());
607            out.push_sql(" PRECEDING");
608        }
609        WindowFrameBound::CurrentRow => out.push_sql("CURRENT ROW"),
610        WindowFrameBound::Following(offset) => {
611            out.push_sql(&offset.to_string());
612            out.push_sql(" FOLLOWING");
613        }
614        WindowFrameBound::UnboundedFollowing => out.push_sql("UNBOUNDED FOLLOWING"),
615    }
616}
617
618impl<Partition, Order, Frame> QueryFragment<ClickHouse> for WindowSpec<Partition, Order, Frame>
619where
620    Partition: WindowSpecPart,
621    Order: WindowSpecPart,
622    Frame: WindowSpecPart,
623{
624    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
625        self.partition.walk_part(out.reborrow())?;
626        if !self.partition.is_empty() && !self.order.is_empty() {
627            out.push_sql(" ");
628        }
629        self.order.walk_part(out.reborrow())?;
630        if (!self.partition.is_empty() || !self.order.is_empty()) && !self.frame.is_empty() {
631            out.push_sql(" ");
632        }
633        self.frame.walk_part(out.reborrow())?;
634        Ok(())
635    }
636}
637
638impl<Q, Predicate> QueryFragment<ClickHouse> for QualifyQuery<Q, Predicate>
639where
640    Q: QueryFragment<ClickHouse>,
641    Predicate: QueryFragment<ClickHouse>,
642{
643    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
644        self.query.walk_ast(out.reborrow())?;
645        out.push_sql(" QUALIFY ");
646        self.predicate.walk_ast(out.reborrow())?;
647        Ok(())
648    }
649}
650
651trait WindowBindings {
652    fn is_empty(&self) -> bool;
653    fn walk_bindings<'b>(&'b self, out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()>;
654}
655
656impl WindowBindings for NoWindowBindings {
657    fn is_empty(&self) -> bool {
658        true
659    }
660
661    fn walk_bindings<'b>(&'b self, _out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
662        Ok(())
663    }
664}
665
666impl<Tail, Spec> WindowBindings for WindowBinding<Tail, Spec>
667where
668    Tail: WindowBindings,
669    Spec: QueryFragment<ClickHouse>,
670{
671    fn is_empty(&self) -> bool {
672        false
673    }
674
675    fn walk_bindings<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
676        self.tail.walk_bindings(out.reborrow())?;
677        if !self.tail.is_empty() {
678            out.push_sql(", ");
679        }
680        validate_bare_identifier(&self.name, "window name")?;
681        out.push_identifier(&self.name)?;
682        out.push_sql(" AS (");
683        self.spec.walk_ast(out.reborrow())?;
684        out.push_sql(")");
685        Ok(())
686    }
687}
688
689impl<Q, Bindings> QueryFragment<ClickHouse> for WindowQuery<Q, Bindings>
690where
691    Q: QueryFragment<ClickHouse>,
692    Bindings: WindowBindings,
693{
694    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, ClickHouse>) -> QueryResult<()> {
695        self.query.walk_ast(out.reborrow())?;
696        if !self.bindings.is_empty() {
697            out.push_sql(" WINDOW ");
698            self.bindings.walk_bindings(out.reborrow())?;
699        }
700        Ok(())
701    }
702}
703
704fn validate_bare_identifier(value: &str, kind: &str) -> QueryResult<()> {
705    let mut chars = value.chars();
706    let Some(first) = chars.next() else {
707        return Err(Error::QueryBuilderError(
708            format!("empty ClickHouse {kind}").into(),
709        ));
710    };
711    if !(first == '_' || first.is_ascii_alphabetic()) {
712        return Err(Error::QueryBuilderError(
713            format!("invalid ClickHouse {kind}: {value:?}").into(),
714        ));
715    }
716    if chars.any(|ch| !(ch == '_' || ch.is_ascii_alphanumeric())) {
717        return Err(Error::QueryBuilderError(
718            format!("invalid ClickHouse {kind}: {value:?}").into(),
719        ));
720    }
721    Ok(())
722}