kassandra/cql/execution/
scan.rs

1use std::ops::RangeBounds;
2
3use bytes::{Bytes, BytesMut};
4use serde::Serialize;
5use tracing::{instrument, Level};
6
7use crate::{
8    cql::{
9        self,
10        execution::{
11            selector::{self, ColumnsSelector},
12            Executor,
13        },
14        value::{
15            ClusteringKeyValue, ClusteringKeyValueRange, PartitionKeyValue, PartitionKeyValueRange,
16        },
17    },
18    frame::{
19        response::{
20            error::Error,
21            result::{QueryResult, ResultMetadata, Row, Rows},
22        },
23        value::PagingState,
24    },
25};
26
27#[derive(Debug, Clone, Serialize)]
28pub struct ScanNode {
29    pub keyspace: String,
30    pub table: String,
31    pub selector: ColumnsSelector,
32    pub metadata: ResultMetadata,
33    pub clustering_key_start: ClusteringKeyValueRange,
34    pub partition_range: PartitionKeyValueRange,
35    pub limit: usize,
36    pub result_page_size: usize,
37}
38
39impl<E: cql::Engine> Executor<E> for ScanNode {
40    #[instrument(level = Level::TRACE, skip(engine), err)]
41    fn execute(self: Box<Self>, engine: &mut E) -> Result<QueryResult, Error> {
42        let mut scan = engine
43            .scan(&self.keyspace, &self.table, self.partition_range)?
44            .take(self.limit);
45
46        let mut rows = vec![];
47        let mut first_partition = None;
48
49        let last_row_entry = loop {
50            let Some(next_entry) = scan.next() else {
51                break None;
52            };
53            if rows.len() >= self.result_page_size {
54                break Some(next_entry);
55            };
56            if first_partition.is_none() {
57                first_partition = Some(next_entry.partition.clone());
58            }
59
60            if Some(&next_entry.partition) == first_partition.as_ref()
61                && !self.clustering_key_start.contains(&next_entry.clustering)
62            {
63                continue;
64            }
65
66            rows.push(Row {
67                columns: selector::filter(next_entry.row, &self.selector),
68            });
69        };
70
71        drop(scan);
72
73        let metadata = if let Some(last_row_entry) = last_row_entry {
74            let state = PagingState::new(
75                Some(encode_partition_key(&last_row_entry.partition)),
76                Some(encode_row_marker(&last_row_entry.clustering)),
77                self.limit - rows.len(),
78                1,
79            );
80
81            ResultMetadata {
82                paging_state: Some(state),
83                ..self.metadata
84            }
85        } else {
86            self.metadata
87        };
88
89        let rows = Rows { metadata, rows };
90
91        Ok(QueryResult::Rows(rows))
92    }
93}
94
95fn encode_row_marker(value: &ClusteringKeyValue) -> Bytes {
96    use crate::frame::write;
97    let mut buf = BytesMut::new();
98    write::clustering_value(&mut buf, value);
99
100    buf.freeze()
101}
102
103fn encode_partition_key(value: &PartitionKeyValue) -> Bytes {
104    use crate::frame::write;
105    let mut buf = BytesMut::new();
106    write::partition_value(&mut buf, value);
107
108    buf.freeze()
109}