1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use crate::{
Result,
engine::exec::{Action, Exec, Output, VarId},
};
use toasty_core::{
driver::{ExecResponse, Rows, operation},
schema::db::{ColumnId, IndexId, TableId},
stmt,
};
#[derive(Debug)]
pub(crate) struct QueryPk {
/// Where to get the input
pub input: Option<VarId>,
/// Where to store the result
pub output: Output,
/// Table to query
pub table: TableId,
/// Optional index to query. None = primary key, Some(id) = secondary index
pub index: Option<IndexId>,
/// Columns to get
pub columns: Vec<ColumnId>,
/// How to filter the index.
pub pk_filter: stmt::Expr,
/// Filter to pass to the database
pub row_filter: Option<stmt::Expr>,
/// Maximum number of items to return.
pub limit: Option<i64>,
/// Sort key ordering direction.
pub order: Option<stmt::Direction>,
/// Cursor for resuming a paginated query.
pub cursor: Option<stmt::Value>,
}
impl Exec<'_> {
pub(super) async fn action_query_pk(&mut self, action: &QueryPk) -> Result<()> {
let mut pk_filter = action.pk_filter.clone();
if let Some(input) = &action.input {
let input = self.collect_input(&[*input]).await?;
pk_filter.substitute(&input);
}
let filters = self.split_filter(pk_filter, action.table);
let mut all_rows = Vec::new();
let mut response_cursor = None;
// Pagination with multiple filters is not supported — a cursor is only
// meaningful for a single partition key query.
assert!(
action.cursor.is_none() || filters.len() <= 1,
"cursor-based pagination with multiple partition filters is not supported"
);
// When there are multiple filters, discard the response cursor since it
// would only apply to the last filter's result set.
let paginated = filters.len() <= 1;
for f in filters {
let res = self
.connection
.exec(
&self.engine.schema,
operation::QueryPk {
table: action.table,
index: action.index,
select: action.columns.clone(),
pk_filter: f,
filter: action.row_filter.clone(),
limit: action.limit,
order: action.order,
cursor: action.cursor.clone(),
}
.into(),
)
.await?;
// Only capture cursor when paginating a single filter
if paginated && res.next_cursor.is_some() {
response_cursor = res.next_cursor;
}
all_rows.extend(res.values.into_value_stream().collect().await?);
}
self.vars.store(
action.output.var,
action.output.num_uses,
ExecResponse {
values: Rows::Stream(stmt::ValueStream::from_vec(all_rows)),
next_cursor: response_cursor,
prev_cursor: None,
},
);
Ok(())
}
}
impl From<QueryPk> for Action {
fn from(value: QueryPk) -> Self {
Action::QueryPk(value)
}
}