kassandra/cql/execution/
scan.rs1use std::ops::RangeBounds;
2
3use bytes::{Bytes, BytesMut};
4use serde::Serialize;
5use tracing::{instrument, Level};
6
7use crate::{
8 cql::{
9 self,
10 execution::{
11 selector::{self, ColumnsSelector},
12 Executor,
13 },
14 value::{
15 ClusteringKeyValue, ClusteringKeyValueRange, PartitionKeyValue, PartitionKeyValueRange,
16 },
17 },
18 frame::{
19 response::{
20 error::Error,
21 result::{QueryResult, ResultMetadata, Row, Rows},
22 },
23 value::PagingState,
24 },
25};
26
27#[derive(Debug, Clone, Serialize)]
28pub struct ScanNode {
29 pub keyspace: String,
30 pub table: String,
31 pub selector: ColumnsSelector,
32 pub metadata: ResultMetadata,
33 pub clustering_key_start: ClusteringKeyValueRange,
34 pub partition_range: PartitionKeyValueRange,
35 pub limit: usize,
36 pub result_page_size: usize,
37}
38
39impl<E: cql::Engine> Executor<E> for ScanNode {
40 #[instrument(level = Level::TRACE, skip(engine), err)]
41 fn execute(self: Box<Self>, engine: &mut E) -> Result<QueryResult, Error> {
42 let mut scan = engine
43 .scan(&self.keyspace, &self.table, self.partition_range)?
44 .take(self.limit);
45
46 let mut rows = vec![];
47 let mut first_partition = None;
48
49 let last_row_entry = loop {
50 let Some(next_entry) = scan.next() else {
51 break None;
52 };
53 if rows.len() >= self.result_page_size {
54 break Some(next_entry);
55 };
56 if first_partition.is_none() {
57 first_partition = Some(next_entry.partition.clone());
58 }
59
60 if Some(&next_entry.partition) == first_partition.as_ref()
61 && !self.clustering_key_start.contains(&next_entry.clustering)
62 {
63 continue;
64 }
65
66 rows.push(Row {
67 columns: selector::filter(next_entry.row, &self.selector),
68 });
69 };
70
71 drop(scan);
72
73 let metadata = if let Some(last_row_entry) = last_row_entry {
74 let state = PagingState::new(
75 Some(encode_partition_key(&last_row_entry.partition)),
76 Some(encode_row_marker(&last_row_entry.clustering)),
77 self.limit - rows.len(),
78 1,
79 );
80
81 ResultMetadata {
82 paging_state: Some(state),
83 ..self.metadata
84 }
85 } else {
86 self.metadata
87 };
88
89 let rows = Rows { metadata, rows };
90
91 Ok(QueryResult::Rows(rows))
92 }
93}
94
95fn encode_row_marker(value: &ClusteringKeyValue) -> Bytes {
96 use crate::frame::write;
97 let mut buf = BytesMut::new();
98 write::clustering_value(&mut buf, value);
99
100 buf.freeze()
101}
102
103fn encode_partition_key(value: &PartitionKeyValue) -> Bytes {
104 use crate::frame::write;
105 let mut buf = BytesMut::new();
106 write::partition_value(&mut buf, value);
107
108 buf.freeze()
109}