1use std::collections::HashMap;
6use std::fmt::Display;
7use std::str::FromStr;
8
9use anyhow::bail;
10use p2panda_rs::document::DocumentViewId;
11use p2panda_rs::operation::OperationValue;
12use p2panda_rs::schema::{FieldName, Schema, SchemaId};
13use p2panda_rs::storage_provider::error::DocumentStorageError;
14use sqlx::query::QueryAs;
15use sqlx::query_as;
16
17use crate::db::models::utils::parse_document_view_field_rows;
18use crate::db::models::{DocumentViewFieldRow, QueryRow};
19use crate::db::query::{
20 ApplicationFields, Cursor, Direction, Field, Filter, FilterBy, FilterSetting, LowerBound,
21 MetaField, Order, Pagination, PaginationField, Select, UpperBound,
22};
23use crate::db::stores::OperationCursor;
24use crate::db::types::StorageDocument;
25use crate::db::{Pool, SqlStore};
26
27#[derive(Debug)]
29pub struct RelationList {
30 pub root_view_id: DocumentViewId,
32
33 pub field: FieldName,
35
36 pub list_type: RelationListType,
38}
39
40#[derive(Debug)]
41pub enum RelationListType {
42 Pinned,
43 Unpinned,
44}
45
46impl RelationList {
47 pub fn new_unpinned(root_view_id: &DocumentViewId, field: &str) -> Self {
48 Self {
49 root_view_id: root_view_id.to_owned(),
50 field: field.to_string(),
51 list_type: RelationListType::Unpinned,
52 }
53 }
54
55 pub fn new_pinned(root_view_id: &DocumentViewId, field: &str) -> Self {
56 Self {
57 root_view_id: root_view_id.to_owned(),
58 field: field.to_string(),
59 list_type: RelationListType::Pinned,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Eq, PartialEq)]
70pub struct PaginationCursor {
71 pub operation_cursor: OperationCursor,
73
74 pub root_operation_cursor: Option<OperationCursor>,
76
77 pub root_view_id: Option<DocumentViewId>,
80}
81
82impl PaginationCursor {
83 pub fn new(
84 operation_cursor: OperationCursor,
85 root_operation_cursor: Option<OperationCursor>,
86 root_view_id: Option<DocumentViewId>,
87 ) -> Self {
88 assert!(root_operation_cursor.is_some() == root_view_id.is_some());
89
90 Self {
91 operation_cursor,
92 root_operation_cursor,
93 root_view_id,
94 }
95 }
96}
97
98impl Display for PaginationCursor {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "{}", self.encode())
101 }
102}
103
104const CURSOR_SEPARATOR: char = '-';
107
108impl Cursor for PaginationCursor {
109 type Error = anyhow::Error;
110
111 fn decode(encoded: &str) -> Result<Self, Self::Error> {
112 let bytes = bs58::decode(encoded).into_vec()?;
113 let decoded = std::str::from_utf8(&bytes)?;
114
115 let parts: Vec<&str> = decoded.split(CURSOR_SEPARATOR).collect();
116 match parts.len() {
117 1 => Ok(Self::new(OperationCursor::from(parts[0]), None, None)),
118 3 => Ok(Self::new(
119 OperationCursor::from(parts[0]),
120 Some(OperationCursor::from(parts[1])),
121 Some(DocumentViewId::from_str(parts[2])?),
122 )),
123 _ => {
124 bail!("Invalid amount of cursor parts");
125 }
126 }
127 }
128
129 fn encode(&self) -> String {
130 bs58::encode(
131 format!(
132 "{}{}",
133 self.operation_cursor,
134 self.root_view_id
135 .as_ref()
136 .map_or("".to_string(), |view_id| format!(
137 "{}{}{}{}",
138 CURSOR_SEPARATOR,
139 self.root_operation_cursor
140 .as_ref()
141 .expect("Expect root_operation to be set when root_view_id is as well"),
142 CURSOR_SEPARATOR,
143 view_id
144 ))
145 )
146 .as_bytes(),
147 )
148 .into_string()
149 }
150}
151
152#[derive(Default, Clone, Debug)]
153pub struct PaginationData<C>
154where
155 C: Cursor,
156{
157 pub total_count: Option<u64>,
159
160 pub has_next_page: bool,
162
163 pub has_previous_page: bool,
165
166 pub start_cursor: Option<C>,
168
169 pub end_cursor: Option<C>,
171}
172
173pub type QueryResponse = (
174 PaginationData<PaginationCursor>,
175 Vec<(PaginationCursor, StorageDocument)>,
176);
177
178#[derive(Debug, Clone)]
181pub struct Query<C>
182where
183 C: Cursor,
184{
185 pub pagination: Pagination<C>,
186 pub select: Select,
187 pub filter: Filter,
188 pub order: Order,
189}
190
191impl<C> Query<C>
192where
193 C: Cursor,
194{
195 pub fn new(
196 pagination: &Pagination<C>,
197 select: &Select,
198 filter: &Filter,
199 order: &Order,
200 ) -> Self {
201 Self {
202 pagination: pagination.clone(),
203 select: select.clone(),
204 filter: filter.clone(),
205 order: order.clone(),
206 }
207 }
208}
209
210fn typecast_field_sql(
213 sql_field: &str,
214 field_name: &str,
215 schema: &Schema,
216 case_sensitive: bool,
217) -> String {
218 let field_type = schema
219 .fields()
220 .iter()
221 .find_map(|(schema_field_name, field_type)| {
222 if schema_field_name == field_name {
223 Some(field_type)
224 } else {
225 None
226 }
227 })
228 .unwrap_or_else(|| panic!("Field '{}' not given in Schema", field_name));
231
232 match field_type {
233 p2panda_rs::schema::FieldType::Integer => {
234 format!("CAST ({sql_field} AS INTEGER)")
235 }
236 p2panda_rs::schema::FieldType::Float => {
237 format!("CAST ({sql_field} AS REAL)")
238 }
239 _ => {
242 if case_sensitive {
243 sql_field.to_string()
244 } else {
245 format!("LOWER({sql_field})")
246 }
247 }
248 }
249}
250
251#[derive(Debug)]
253enum BindArgument {
254 String(String),
255 Integer(i64),
256 Float(f64),
257}
258
259fn bind_to_query<'q, O>(
261 mut query: QueryAs<'q, sqlx::Any, O, sqlx::any::AnyArguments<'q>>,
262 args: &'q Vec<BindArgument>,
263) -> QueryAs<'q, sqlx::Any, O, sqlx::any::AnyArguments<'q>> {
264 for arg in args {
265 query = match arg {
266 BindArgument::String(value) => query.bind(value),
267 BindArgument::Integer(value) => query.bind(value),
268 BindArgument::Float(value) => query.bind(value),
269 };
270 }
271
272 query
273}
274
275fn bind_arg(value: &OperationValue) -> Vec<BindArgument> {
277 match &value {
278 OperationValue::Boolean(value) => vec![BindArgument::String(
285 (if *value { "true" } else { "false" }).to_string(),
286 )],
287 OperationValue::Integer(value) => vec![BindArgument::Integer(value.to_owned())],
288 OperationValue::Float(value) => vec![BindArgument::Float(value.to_owned())],
289 OperationValue::String(value) => vec![BindArgument::String(value.to_owned())],
290 OperationValue::Relation(value) => {
291 vec![BindArgument::String(value.document_id().to_string())]
292 }
293 OperationValue::RelationList(value) => value
294 .iter()
295 .map(|document_id| BindArgument::String(document_id.to_string()))
296 .collect(),
297 OperationValue::PinnedRelation(value) => {
298 vec![BindArgument::String(value.view_id().to_string())]
299 }
300 OperationValue::PinnedRelationList(value) => value
301 .iter()
302 .map(|view_id| BindArgument::String(view_id.to_string()))
303 .collect(),
304 OperationValue::Bytes(value) => vec![BindArgument::String(hex::encode(value))],
305 }
306}
307
308fn cmp_sql(
313 sql_field: &str,
314 filter_setting: &FilterSetting,
315 args: &mut Vec<BindArgument>,
316) -> String {
317 match &filter_setting.by {
318 FilterBy::Element(value) => {
319 args.append(&mut bind_arg(value));
320
321 if !filter_setting.exclusive {
322 format!("{sql_field} = ${}", args.len())
323 } else {
324 format!("{sql_field} != ${}", args.len())
325 }
326 }
327 FilterBy::Set(values_vec) => {
328 let args_sql = values_vec
329 .iter()
330 .map(|value| {
331 args.append(&mut bind_arg(value));
332 format!("${}", args.len())
333 })
334 .collect::<Vec<String>>()
335 .join(",");
336
337 if !filter_setting.exclusive {
338 format!("{sql_field} IN ({})", args_sql)
339 } else {
340 format!("{sql_field} NOT IN ({})", args_sql)
341 }
342 }
343 FilterBy::Interval(lower_value, upper_value) => {
344 let mut values: Vec<String> = Vec::new();
345
346 match lower_value {
347 LowerBound::Unbounded => (),
348 LowerBound::Greater(value) => {
349 args.append(&mut bind_arg(value));
350 values.push(format!("{sql_field} > ${}", args.len()));
351 }
352 LowerBound::GreaterEqual(value) => {
353 args.append(&mut bind_arg(value));
354 values.push(format!("{sql_field} >= ${}", args.len()));
355 }
356 }
357
358 match upper_value {
359 UpperBound::Unbounded => (),
360 UpperBound::Lower(value) => {
361 args.append(&mut bind_arg(value));
362 values.push(format!("{sql_field} < ${}", args.len()));
363 }
364 UpperBound::LowerEqual(value) => {
365 args.append(&mut bind_arg(value));
366 values.push(format!("{sql_field} <= ${}", args.len()));
367 }
368 }
369
370 values.join(" AND ")
371 }
372 FilterBy::Contains(OperationValue::String(value)) => {
373 args.push(BindArgument::String(format!("%{value}%")));
374
375 if !filter_setting.exclusive {
376 format!("{sql_field} LIKE ${}", args.len())
377 } else {
378 format!("{sql_field} NOT LIKE ${}", args.len())
379 }
380 }
381 _ => panic!("Unsupported filter"),
382 }
383}
384
385fn concatenate_sql(items: &[Option<String>]) -> String {
387 items
388 .iter()
389 .filter_map(|item| item.as_ref().cloned())
390 .collect::<Vec<String>>()
391 .join(", ")
392}
393
394fn where_filter_sql(filter: &Filter, schema: &Schema) -> (String, Vec<BindArgument>) {
400 let mut args: Vec<BindArgument> = Vec::new();
401
402 let sql = filter
403 .iter()
404 .filter_map(|filter_setting| {
405 match &filter_setting.field {
406 Field::Meta(MetaField::Owner) => {
407 let filter_cmp = cmp_sql("operations_v1.public_key", filter_setting, &mut args);
408
409 Some(format!(
410 r#"
411 AND EXISTS (
412 SELECT
413 operations_v1.public_key
414 FROM
415 operations_v1
416 WHERE
417 operations_v1.operation_id = documents.document_id
418 AND
419 {filter_cmp}
420 )
421 "#
422 ))
423 }
424 Field::Meta(MetaField::Edited) => {
425 if let FilterBy::Element(OperationValue::Boolean(filter_value)) =
426 filter_setting.by
427 {
428 let edited_flag = if filter_value { "true" } else { "false" };
431 Some(format!("AND is_edited = {edited_flag}"))
432 } else {
433 None
434 }
435 }
436 Field::Meta(MetaField::Deleted) => {
437 if let FilterBy::Element(OperationValue::Boolean(filter_value)) =
438 filter_setting.by
439 {
440 let deleted_flag = if filter_value { "true" } else { "false" };
443 Some(format!("AND documents.is_deleted = {deleted_flag}"))
444 } else {
445 None
446 }
447 }
448 Field::Meta(MetaField::DocumentId) => Some(format!(
449 "AND {}",
450 cmp_sql("documents.document_id", filter_setting, &mut args)
451 )),
452 Field::Meta(MetaField::DocumentViewId) => Some(format!(
453 "AND {}",
454 cmp_sql("documents.document_view_id", filter_setting, &mut args)
455 )),
456 Field::Field(field_name) => {
457 let field_sql = typecast_field_sql("operation_fields_v1.value", field_name, schema, true);
458 let filter_cmp = cmp_sql(&field_sql, filter_setting, &mut args);
459
460 Some(format!(
461 r#"
462 AND EXISTS (
463 SELECT
464 operation_fields_v1.value
465 FROM
466 document_view_fields AS document_view_fields_subquery
467 JOIN operation_fields_v1
468 ON
469 document_view_fields_subquery.operation_id = operation_fields_v1.operation_id
470 AND
471 document_view_fields_subquery.name = operation_fields_v1.name
472 WHERE
473 -- Match document_view_fields of this subquery with the parent one
474 document_view_fields.document_view_id = document_view_fields_subquery.document_view_id
475
476 -- Check if this document view fullfils this filter
477 AND operation_fields_v1.name = '{field_name}'
478 AND
479 {filter_cmp}
480 AND
481 operation_fields_v1.operation_id = document_view_fields_subquery.operation_id
482 )
483 "#
484 ))
485 }
486 }
487 })
488 .collect::<Vec<String>>()
489 .join("\n");
490
491 (sql, args)
492}
493
494async fn where_pagination_sql(
523 pool: &Pool,
524 bind_args: &mut Vec<BindArgument>,
525 pagination: &Pagination<PaginationCursor>,
526 fields: &ApplicationFields,
527 list: Option<&RelationList>,
528 schema: &Schema,
529 order: &Order,
530) -> Result<String, DocumentStorageError> {
531 if pagination.after.is_none() {
533 return Ok("".to_string());
534 }
535
536 if let (Some(relation_list), Some(cursor)) = (list, pagination.after.as_ref()) {
539 if Some(&relation_list.root_view_id) != cursor.root_view_id.as_ref() {
540 return Ok("".to_string());
541 }
542 }
543
544 let cursor = pagination
546 .after
547 .as_ref()
548 .expect("Expect cursor to be set at this point");
549 let operation_cursor = &cursor.operation_cursor;
550
551 let cursor_sql = match list {
552 Some(_) => {
553 let root_cursor = cursor
554 .root_operation_cursor
555 .as_ref()
556 .expect("Expect root_operation_cursor to be set when querying relation list");
557
558 format!("operation_fields_v1_list.cursor > '{root_cursor}'")
559 }
560 None => {
561 format!("operation_fields_v1.cursor > '{operation_cursor}'")
562 }
563 };
564
565 let cmp_direction = match order.direction {
566 Direction::Ascending => ">",
567 Direction::Descending => "<",
568 };
569
570 match &order.field {
571 None => match list {
580 None => {
581 let cmp_value_pre = format!(
595 r#"
596 SELECT
597 document_view_fields.document_view_id
598 FROM
599 operation_fields_v1
600 JOIN document_view_fields
601 ON operation_fields_v1.operation_id = document_view_fields.operation_id
602 WHERE
603 operation_fields_v1.cursor = '{operation_cursor}'
604 LIMIT 1
605 "#
606 );
607
608 let document_view_id: (String,) = query_as(&cmp_value_pre)
611 .fetch_one(pool)
612 .await
613 .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
614
615 Ok(format!(
616 "AND documents.document_view_id > '{}'",
617 document_view_id.0
618 ))
619 }
620 Some(_) => {
621 let root_cursor = cursor
638 .root_operation_cursor
639 .as_ref()
640 .expect("Expect root_operation_cursor to be set when querying relation list");
641
642 let cmp_value_pre = format!(
643 r#"
644 -- When ordering is activated we need to compare against the value
645 -- of the ordered field - but from the row where the cursor points at
646 SELECT
647 operation_fields_v1.list_index
648 FROM
649 operation_fields_v1
650 WHERE
651 operation_fields_v1.cursor = '{root_cursor}'
652 "#
653 );
654
655 let list_index: (i32,) = query_as(&cmp_value_pre)
658 .fetch_one(pool)
659 .await
660 .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
661
662 Ok(format!(
664 "AND operation_fields_v1_list.list_index > {}",
665 list_index.0
666 ))
667 }
668 },
669
670 Some(Field::Meta(meta_field)) => {
676 let (cmp_value_pre, cmp_field) = match meta_field {
677 MetaField::DocumentId => {
678 let cmp_value = format!(
680 r#"
681 SELECT
682 operations_v1.document_id
683 FROM
684 operation_fields_v1
685 JOIN operations_v1
686 ON operation_fields_v1.operation_id = operations_v1.operation_id
687 WHERE
688 operation_fields_v1.cursor = '{operation_cursor}'
689 LIMIT 1
690 "#
691 );
692
693 (cmp_value, "documents.document_id")
694 }
695 MetaField::DocumentViewId => {
696 let cmp_value = format!(
698 r#"
699 SELECT
700 document_view_fields.document_view_id
701 FROM
702 operation_fields_v1
703 JOIN document_view_fields
704 ON operation_fields_v1.operation_id = document_view_fields.operation_id
705 WHERE
706 operation_fields_v1.cursor = '{operation_cursor}'
707 LIMIT 1
708 "#
709 );
710
711 (cmp_value, "documents.document_view_id")
712 }
713 MetaField::Owner | MetaField::Edited | MetaField::Deleted => {
714 todo!("Not implemented");
716 }
717 };
718
719 let cmp_value: (String,) = query_as(&cmp_value_pre)
722 .fetch_one(pool)
723 .await
724 .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
725 let cmp_value = format!("'{}'", cmp_value.0);
726
727 if fields.is_empty() && list.is_none() {
728 Ok(format!("AND {cmp_field} {cmp_direction} {cmp_value}"))
737 } else {
738 Ok(format!(
740 r#"
741 AND (
742 {cmp_field} {cmp_direction} {cmp_value}
743 OR
744 (
745 {cmp_field} = {cmp_value}
746 AND
747 {cursor_sql}
748 )
749 )
750 "#
751 ))
752 }
753 }
754
755 Some(Field::Field(order_field_name)) => {
780 let cmp_value_pre = format!(
783 r#"
784 SELECT
785 operation_fields_v1.value
786
787 FROM
788 operation_fields_v1
789 LEFT JOIN
790 document_view_fields
791 ON document_view_fields.operation_id = operation_fields_v1.operation_id
792
793 WHERE
794 document_view_fields.document_view_id = (
795 SELECT
796 document_view_fields.document_view_id
797
798 FROM
799 operation_fields_v1
800 LEFT JOIN
801 document_view_fields
802 ON document_view_fields.operation_id = operation_fields_v1.operation_id
803
804 WHERE
805 operation_fields_v1.cursor = '{operation_cursor}'
806
807 LIMIT 1
808 )
809 AND operation_fields_v1.name = '{order_field_name}'
810
811 LIMIT 1
812 "#
813 );
814
815 let operation_fields_value: (String,) = query_as(&cmp_value_pre)
821 .fetch_one(pool)
822 .await
823 .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
824 bind_args.push(BindArgument::String(operation_fields_value.0));
825
826 let cmp_field =
828 typecast_field_sql("operation_fields_v1.value", order_field_name, schema, false);
829
830 let bind_arg_marker = typecast_field_sql(
831 &format!("${}", bind_args.len()),
832 order_field_name,
833 schema,
834 false,
835 );
836
837 Ok(format!(
839 r#"
840 AND EXISTS (
841 SELECT
842 operation_fields_v1.value
843 FROM
844 operation_fields_v1
845 LEFT JOIN document_view_fields
846 ON operation_fields_v1.operation_id = document_view_fields.operation_id
847 WHERE
848 operation_fields_v1.name = '{order_field_name}'
849 AND
850 document_view_fields.document_view_id = documents.document_view_id
851 AND
852 (
853 {cmp_field} {cmp_direction} {bind_arg_marker}
854 OR
855 (
856 {cmp_field} = {bind_arg_marker}
857 AND
858 {cursor_sql}
859 )
860 )
861 )
862 "#
863 ))
864 }
865 }
866}
867
868fn order_sql(
869 order: &Order,
870 schema: &Schema,
871 list: Option<&RelationList>,
872 fields: &ApplicationFields,
873) -> String {
874 let custom = order
876 .field
877 .as_ref()
878 .map(|field| match field {
879 Field::Meta(MetaField::DocumentId) => "documents.document_id".to_string(),
880 Field::Meta(MetaField::DocumentViewId) => "documents.document_view_id".to_string(),
881 Field::Meta(MetaField::Owner) => "owner".to_string(),
882 Field::Meta(MetaField::Edited) => "is_edited".to_string(),
883 Field::Meta(MetaField::Deleted) => "is_deleted".to_string(),
884 Field::Field(field_name) => {
885 format!(
886 r#"
887 (
888 SELECT
889 {}
890 FROM
891 operation_fields_v1
892 LEFT JOIN document_view_fields
893 ON operation_fields_v1.operation_id = document_view_fields.operation_id
894 WHERE
895 operation_fields_v1.name = '{}'
896 AND document_view_fields.document_view_id = documents.document_view_id
897 LIMIT 1
898 )
899 "#,
900 typecast_field_sql("operation_fields_v1.value", field_name, schema, false),
901 field_name,
902 )
903 }
904 })
905 .map(|field| {
906 let direction = match order.direction {
907 Direction::Ascending => "ASC",
908 Direction::Descending => "DESC",
909 };
910
911 format!("{field} {direction}")
912 });
913
914 let list_sql = match (&order.field, list) {
916 (None, Some(_)) => Some("operation_fields_v1_list.list_index ASC".to_string()),
917 _ => None,
918 };
919
920 let id_sql = {
923 if fields.len() > 1 {
924 match order.field {
925 Some(Field::Meta(MetaField::DocumentViewId)) => None,
926 _ => Some("documents.document_view_id ASC".to_string()),
927 }
928 } else {
929 None
939 }
940 };
941
942 let cursor_sql = match list {
945 Some(_) => Some("operation_fields_v1_list.cursor ASC".to_string()),
946 None => Some("operation_fields_v1.cursor ASC".to_string()),
947 };
948
949 let order = concatenate_sql(&[custom, list_sql, id_sql, cursor_sql]);
950
951 format!("ORDER BY {order}")
952}
953
954fn limit_sql<C>(pagination: &Pagination<C>, fields: &ApplicationFields) -> (u64, String)
955where
956 C: Cursor,
957{
958 let page_size = pagination.first.get() * std::cmp::max(1, fields.len() as u64);
961
962 (page_size, format!("LIMIT {page_size} + 1"))
964}
965
966fn where_fields_sql(fields: &ApplicationFields) -> String {
967 if fields.is_empty() {
968 "".to_string()
969 } else {
970 let fields_sql: Vec<String> = fields.iter().map(|field| format!("'{field}'")).collect();
971 format!(
972 "AND operation_fields_v1.name IN ({})",
973 fields_sql.join(", ")
974 )
975 }
976}
977
978fn select_edited_sql(select: &Select) -> Option<String> {
979 if select.fields.contains(&Field::Meta(MetaField::Edited)) {
980 let sql = r#"
981 -- Check if there is more operations next to the initial "create" operation
982 (
983 SELECT
984 operations_v1.public_key
985 FROM
986 operations_v1
987 WHERE
988 operations_v1.action != "create"
989 AND
990 operations_v1.document_id = documents.document_id
991 LIMIT 1
992 ) AS is_edited
993 "#
994 .to_string();
995
996 Some(sql)
997 } else {
998 None
999 }
1000}
1001
1002fn select_owner_sql(select: &Select) -> Option<String> {
1003 if select.fields.contains(&Field::Meta(MetaField::Owner)) {
1004 let sql = r#"
1007 (
1008 SELECT
1009 operations_v1.public_key
1010 FROM
1011 operations_v1
1012 WHERE
1013 operations_v1.operation_id = documents.document_id
1014 ) AS owner
1015 "#
1016 .to_string();
1017
1018 Some(sql)
1019 } else {
1020 None
1021 }
1022}
1023
1024fn select_fields_sql(fields: &ApplicationFields) -> Vec<Option<String>> {
1025 let mut select = Vec::new();
1026
1027 if !fields.is_empty() {
1028 select.push(Some("operation_fields_v1.name".to_string()));
1030 select.push(Some("operation_fields_v1.value".to_string()));
1031 select.push(Some("operation_fields_v1.field_type".to_string()));
1032 }
1033
1034 select
1035}
1036
1037fn select_cursor_sql(list: Option<&RelationList>) -> Vec<Option<String>> {
1038 match list {
1039 Some(_) => {
1040 vec![
1041 Some("operation_fields_v1.cursor AS cmp_value_cursor".to_string()),
1042 Some("operation_fields_v1_list.cursor AS root_cursor".to_string()),
1043 ]
1044 }
1045 None => vec![Some(
1046 "operation_fields_v1.cursor AS cmp_value_cursor".to_string(),
1047 )],
1048 }
1049}
1050
1051fn where_sql(schema: &Schema, fields: &ApplicationFields, list: Option<&RelationList>) -> String {
1052 let schema_id = schema.id();
1053
1054 let list_index_sql = "AND operation_fields_v1.list_index = 0";
1056
1057 let extra_field_select = if fields.is_empty() {
1061 let (field_name, _) = schema.fields().iter().next().unwrap();
1062 format!("AND operation_fields_v1.name = '{field_name}'")
1063 } else {
1064 "".to_string()
1065 };
1066
1067 match list {
1068 None => {
1069 format!(
1071 r#"
1072 documents.schema_id = '{schema_id}'
1073 {list_index_sql}
1074 {extra_field_select}
1075 "#
1076 )
1077 }
1078 Some(relation_list) => {
1079 let field_name = &relation_list.field;
1081 let root_view_id = &relation_list.root_view_id;
1082 let field_type = match relation_list.list_type {
1083 RelationListType::Pinned => "pinned_relation_list",
1084 RelationListType::Unpinned => "relation_list",
1085 };
1086
1087 format!(
1088 r#"
1089 document_view_fields_list.document_view_id = '{root_view_id}'
1090 AND
1091 operation_fields_v1_list.field_type = '{field_type}'
1092 AND
1093 operation_fields_v1_list.name = '{field_name}'
1094 {list_index_sql}
1095 {extra_field_select}
1096 "#
1097 )
1098 }
1099 }
1100}
1101
1102fn from_sql(list: Option<&RelationList>) -> String {
1103 match list {
1104 Some(relation_list) => {
1105 let filter_sql = match relation_list.list_type {
1106 RelationListType::Pinned => "documents.document_view_id",
1107 RelationListType::Unpinned => "documents.document_id",
1108 };
1109
1110 format!(
1111 r#"
1112 -- Select relation list of parent document first ..
1113 document_view_fields document_view_fields_list
1114 JOIN operation_fields_v1 operation_fields_v1_list
1115 ON
1116 document_view_fields_list.operation_id = operation_fields_v1_list.operation_id
1117 AND
1118 document_view_fields_list.name = operation_fields_v1_list.name
1119
1120 -- .. and join the related documents afterwards
1121 JOIN documents
1122 ON
1123 operation_fields_v1_list.value = {filter_sql}
1124 JOIN document_view_fields
1125 ON documents.document_view_id = document_view_fields.document_view_id
1126 "#
1127 )
1128 }
1129 None => r#"
1131 documents
1132 JOIN document_view_fields
1133 ON documents.document_view_id = document_view_fields.document_view_id
1134 "#
1135 .to_string(),
1136 }
1137}
1138
1139impl SqlStore {
1140 pub async fn query(
1146 &self,
1147 schema: &Schema,
1148 args: &Query<PaginationCursor>,
1149 list: Option<&RelationList>,
1150 ) -> Result<QueryResponse, DocumentStorageError> {
1151 let application_fields = args.select.application_fields();
1153
1154 let mut select_vec = vec![
1156 Some("documents.document_id".to_string()),
1159 Some("documents.document_view_id".to_string()),
1160 Some("document_view_fields.operation_id".to_string()),
1161 Some("documents.is_deleted".to_string()),
1164 ];
1165
1166 select_vec.append(&mut select_cursor_sql(list));
1169
1170 select_vec.push(select_edited_sql(&args.select));
1172 select_vec.push(select_owner_sql(&args.select));
1173 select_vec.append(&mut select_fields_sql(&application_fields));
1174
1175 let select = concatenate_sql(&select_vec);
1176
1177 let from = from_sql(list);
1178
1179 let where_ = where_sql(schema, &application_fields, list);
1180 let and_fields = where_fields_sql(&application_fields);
1181 let (and_filters, mut bind_args) = where_filter_sql(&args.filter, schema);
1182 let and_pagination = where_pagination_sql(
1183 &self.pool,
1184 &mut bind_args,
1185 &args.pagination,
1186 &application_fields,
1187 list,
1188 schema,
1189 &args.order,
1190 )
1191 .await?;
1192
1193 let order = order_sql(&args.order, schema, list, &application_fields);
1194 let (page_size, limit) = limit_sql(&args.pagination, &application_fields);
1195
1196 let sea_quel = format!(
1197 r#"
1198 -- ░░░░░▄▀▀▀▄░░░░░░░░░░░░░░░░░
1199 -- ▄███▀░◐░░░▌░░░░░░░░░░░░░░░░
1200 -- ░░░░▌░░░░░▐░░░░░░░░░░░░░░░░
1201 -- ░░░░▐░░░░░▐░░░░░░░░░░░░░░░░
1202 -- ░░░░▌░░░░░▐▄▄░░░░░░░░░░░░░░
1203 -- ░░░░▌░░░░▄▀▒▒▀▀▀▀▄░░░░░░░░░
1204 -- ░░░▐░░░░▐▒▒▒▒▒▒▒▒▀▀▄░░░░░░░
1205 -- ░░░▐░░░░▐▄▒▒▒▒▒▒▒▒▒▒▀▄░░░░░
1206 -- ░░░░▀▄░░░░▀▄▒▒▒▒▒▒▒▒▒▒▀▄░░░
1207 -- ░░░░░░▀▄▄▄▄▄█▄▄▄▄▄▄▄▄▄▄▄▀▄░
1208 -- ░░░░░░░░░░░▌▌░▌▌░░░░░░░░░░░
1209 -- ░░░░░░░░░░░▌▌░▌▌░░░░░░░░░░░
1210 -- ░░░░░░░░░▄▄▌▌▄▌▌░░░░░░░░░░░
1211 --
1212 -- Hello, my name is sea gull.
1213 -- I designed a SQL query which
1214 -- is as smart, as big and as
1215 -- annoying as myself.
1216
1217 SELECT
1218 {select}
1219
1220 FROM
1221 -- Usually we query the "documents" table first. In case we're looking at a relation
1222 -- list this is slighly more complicated and we need to do some additional JOINs
1223 {from}
1224
1225 -- We need to add some more JOINs to get the values from the operations
1226 JOIN operation_fields_v1
1227 ON
1228 document_view_fields.operation_id = operation_fields_v1.operation_id
1229 AND
1230 document_view_fields.name = operation_fields_v1.name
1231
1232 WHERE
1233 -- We filter by the queried schema of that collection, if it is a relation
1234 -- list we filter by the view id of the parent document
1235 {where_}
1236
1237 -- .. and select only the operation fields we're interested in
1238 {and_fields}
1239
1240 -- .. and further filter the data by custom parameters
1241 {and_filters}
1242
1243 -- Lastly we batch all results into smaller chunks via cursor pagination
1244 {and_pagination}
1245
1246 -- We always order the rows by document id and list index, but there might also be
1247 -- user-defined ordering on top of that
1248 {order}
1249
1250 -- Connected to cursor pagination we limit the number of rows
1251 {limit}
1252 "#
1253 );
1254
1255 let mut query = query_as::<_, QueryRow>(&sea_quel);
1256
1257 query = bind_to_query(query, &bind_args);
1259
1260 let mut rows: Vec<QueryRow> = query
1261 .fetch_all(&self.pool)
1262 .await
1263 .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
1264
1265 let has_next_page = if rows.len() as u64 > page_size {
1268 rows.pop();
1270 true
1271 } else {
1272 false
1273 };
1274
1275 let total_count = if args
1277 .pagination
1278 .fields
1279 .contains(&PaginationField::TotalCount)
1280 {
1281 Some(self.count(schema, args, list).await?)
1282 } else {
1283 None
1284 };
1285
1286 let documents = convert_rows(rows, list, &application_fields, schema.id());
1288
1289 let start_cursor = if args
1291 .pagination
1292 .fields
1293 .contains(&PaginationField::StartCursor)
1294 {
1295 documents.first().map(|(cursor, _)| cursor.to_owned())
1296 } else {
1297 None
1298 };
1299
1300 let end_cursor = if args.pagination.fields.contains(&PaginationField::EndCursor) {
1301 documents.last().map(|(cursor, _)| cursor.to_owned())
1302 } else {
1303 None
1304 };
1305
1306 let pagination_data = PaginationData {
1307 total_count,
1308 has_next_page,
1309 has_previous_page: false,
1312 start_cursor,
1313 end_cursor,
1314 };
1315
1316 Ok((pagination_data, documents))
1317 }
1318
1319 pub async fn count(
1321 &self,
1322 schema: &Schema,
1323 args: &Query<PaginationCursor>,
1324 list: Option<&RelationList>,
1325 ) -> Result<u64, DocumentStorageError> {
1326 let application_fields = args.select.application_fields();
1327
1328 let from = from_sql(list);
1329 let where_ = where_sql(schema, &application_fields, list);
1330 let (and_filters, bind_args) = where_filter_sql(&args.filter, schema);
1331
1332 let count_sql = format!(
1333 r#"
1334 SELECT
1335 COUNT(documents.document_id)
1336
1337 FROM
1338 {from}
1339
1340 JOIN operation_fields_v1
1341 ON
1342 document_view_fields.operation_id = operation_fields_v1.operation_id
1343 AND
1344 document_view_fields.name = operation_fields_v1.name
1345
1346 WHERE
1347 {where_}
1348 {and_filters}
1349
1350 -- Group application fields by name to make sure we get actual number of documents
1351 GROUP BY operation_fields_v1.name
1352 "#
1353 );
1354
1355 let mut query = query_as::<_, (i64,)>(&count_sql);
1356
1357 query = bind_to_query(query, &bind_args);
1359
1360 let result: Option<(i64,)> = query
1361 .fetch_optional(&self.pool)
1362 .await
1363 .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
1364
1365 let count = match result {
1366 Some(result) => result.0 as u64,
1367 None => 0,
1368 };
1369
1370 Ok(count)
1371 }
1372}
1373
1374fn convert_rows(
1398 rows: Vec<QueryRow>,
1399 list: Option<&RelationList>,
1400 fields: &ApplicationFields,
1401 schema_id: &SchemaId,
1402) -> Vec<(PaginationCursor, StorageDocument)> {
1403 let mut converted: Vec<(PaginationCursor, StorageDocument)> = Vec::new();
1404
1405 if rows.is_empty() {
1406 return converted;
1407 }
1408
1409 let finalize_document = |row: &QueryRow,
1411 collected_fields: Vec<DocumentViewFieldRow>,
1412 collected_rows: &HashMap<FieldName, QueryRow>|
1413 -> (PaginationCursor, StorageDocument) {
1414 let cursor = {
1416 let last_field = collected_fields
1417 .last()
1418 .expect("Needs to be at least one field");
1419
1420 row_to_cursor(
1421 collected_rows
1422 .get(&last_field.name)
1423 .expect("Field selected for ordering needs to be inside of document"),
1424 list,
1425 )
1426 };
1427
1428 let fields = parse_document_view_field_rows(collected_fields);
1430
1431 let document = StorageDocument {
1432 id: row.document_id.parse().unwrap(),
1433 fields: Some(fields),
1434 schema_id: schema_id.clone(),
1435 view_id: row.document_view_id.parse().unwrap(),
1436 author: (&row.owner).into(),
1437 deleted: row.is_deleted,
1438 };
1439
1440 (cursor, document)
1441 };
1442
1443 let rows_per_document = std::cmp::max(fields.len(), 1);
1444
1445 let mut current = rows[0].clone();
1446 let mut current_fields = Vec::with_capacity(rows_per_document);
1447 let mut current_rows = HashMap::new();
1448
1449 for (index, row) in rows.into_iter().enumerate() {
1450 if index % rows_per_document == 0 && index > 0 {
1452 let (cursor, document) = finalize_document(¤t, current_fields, ¤t_rows);
1454 converted.push((cursor, document));
1455
1456 current = row.clone();
1458 current_fields = Vec::with_capacity(rows_per_document);
1459 }
1460
1461 current_rows.insert(row.name.clone(), row.clone());
1463
1464 current_fields.push(DocumentViewFieldRow {
1466 document_id: row.document_id,
1467 document_view_id: row.document_view_id,
1468 operation_id: row.operation_id,
1469 name: row.name,
1470 list_index: 0,
1471 field_type: row.field_type,
1472 value: row.value,
1473 });
1474 }
1475
1476 let (cursor, document) = finalize_document(¤t, current_fields, ¤t_rows);
1478 converted.push((cursor, document));
1479
1480 converted
1481}
1482
1483fn row_to_cursor(row: &QueryRow, list: Option<&RelationList>) -> PaginationCursor {
1485 match list {
1486 Some(relation_list) => {
1487 PaginationCursor::new(
1491 row.cmp_value_cursor.as_str().into(),
1492 Some(row.root_cursor.as_str().into()),
1493 Some(relation_list.root_view_id.clone()),
1494 )
1495 }
1496 None => PaginationCursor::new(row.cmp_value_cursor.as_str().into(), None, None),
1497 }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502 use std::num::NonZeroU64;
1503
1504 use p2panda_rs::document::traits::AsDocument;
1505 use p2panda_rs::document::DocumentViewId;
1506 use p2panda_rs::hash::Hash;
1507 use p2panda_rs::identity::KeyPair;
1508 use p2panda_rs::operation::{OperationValue, PinnedRelationList};
1509 use p2panda_rs::schema::{FieldType, Schema, SchemaId};
1510 use p2panda_rs::storage_provider::traits::DocumentStore;
1511 use p2panda_rs::test_utils::fixtures::{key_pair, schema_id};
1512 use rstest::rstest;
1513
1514 use crate::db::models::{OptionalOwner, QueryRow};
1515 use crate::db::query::{
1516 Direction, Field, Filter, MetaField, Order, Pagination, PaginationField, Select,
1517 };
1518 use crate::db::stores::{OperationCursor, RelationList};
1519 use crate::db::types::StorageDocument;
1520 use crate::test_utils::{
1521 add_document, add_schema, add_schema_and_documents, doggo_fields, doggo_schema,
1522 populate_and_materialize, populate_store_config, test_runner, PopulateStoreConfig,
1523 TestNode,
1524 };
1525
1526 use super::{convert_rows, PaginationCursor, Query};
1527
1528 fn get_document_value(document: &StorageDocument, field: &str) -> OperationValue {
1529 document
1530 .fields()
1531 .expect("Expected document fields")
1532 .get(field)
1533 .unwrap_or_else(|| panic!("{}", "Expected '{field}' field to exist in document"))
1534 .value()
1535 .to_owned()
1536 }
1537
1538 async fn create_events_test_data(
1539 node: &mut TestNode,
1540 key_pair: &KeyPair,
1541 ) -> (Schema, Vec<DocumentViewId>) {
1542 add_schema_and_documents(
1543 node,
1544 "events",
1545 vec![
1546 vec![
1547 (
1548 "title",
1549 "Kids Bits! Chiptune for baby squirrels".into(),
1550 None,
1551 ),
1552 ("date", "2023-04-17".into(), None),
1553 ("ticket_price", 5.75.into(), None),
1554 ],
1555 vec![
1556 ("title", "The Pandadoodle Flute Trio".into(), None),
1557 ("date", "2023-04-14".into(), None),
1558 ("ticket_price", 12.5.into(), None),
1559 ],
1560 vec![
1561 ("title", "Eventual Consistent Grapefruit".into(), None),
1562 ("date", "2023-05-02".into(), None),
1563 ("ticket_price", 24.99.into(), None),
1564 ],
1565 vec![
1566 (
1567 "title",
1568 "Bamboo-Scrumble Rumba Night - Xmas special".into(),
1569 None,
1570 ),
1571 ("date", "2023-12-20".into(), None),
1572 ("ticket_price", 99.0.into(), None),
1573 ],
1574 vec![
1575 ("title", "Shoebill - Non-migratory Shoegaze".into(), None),
1576 ("date", "2023-09-09".into(), None),
1577 ("ticket_price", 10.00.into(), None),
1578 ],
1579 ],
1580 key_pair,
1581 )
1582 .await
1583 }
1584
1585 async fn create_venues_test_data(
1586 node: &mut TestNode,
1587 key_pair: &KeyPair,
1588 ) -> (Schema, Vec<DocumentViewId>) {
1589 add_schema_and_documents(
1590 node,
1591 "venues",
1592 vec![
1593 vec![("name", "World Wide Feld".into(), None)],
1594 vec![("name", "Internet Explorer".into(), None)],
1595 vec![("name", "p4p space".into(), None)],
1596 ],
1597 key_pair,
1598 )
1599 .await
1600 }
1601
1602 async fn create_visited_test_data(
1603 node: &mut TestNode,
1604 venues_view_ids: Vec<DocumentViewId>,
1605 venues_schema: Schema,
1606 key_pair: &KeyPair,
1607 ) -> (Schema, Vec<DocumentViewId>) {
1608 add_schema_and_documents(
1609 node,
1610 "visited",
1611 vec![
1612 vec![
1613 ("user", "seagull".into(), None),
1614 (
1615 "venues",
1616 vec![
1617 venues_view_ids[0].clone(),
1618 venues_view_ids[0].clone(),
1619 venues_view_ids[1].clone(),
1620 venues_view_ids[2].clone(),
1621 venues_view_ids[0].clone(),
1622 venues_view_ids[0].clone(),
1623 venues_view_ids[1].clone(),
1624 ]
1625 .into(),
1626 Some(venues_schema.id().to_owned()),
1627 ),
1628 ],
1629 vec![
1630 ("user", "panda".into(), None),
1631 (
1632 "venues",
1633 vec![
1634 venues_view_ids[1].clone(),
1635 venues_view_ids[1].clone(),
1636 venues_view_ids[2].clone(),
1637 ]
1638 .into(),
1639 Some(venues_schema.id().to_owned()),
1640 ),
1641 ],
1642 ],
1643 key_pair,
1644 )
1645 .await
1646 }
1647
1648 async fn create_chat_test_data(
1649 node: &mut TestNode,
1650 key_pair: &KeyPair,
1651 ) -> (Schema, Vec<DocumentViewId>) {
1652 add_schema_and_documents(
1653 node,
1654 "chat",
1655 vec![
1656 vec![
1657 ("message", "Hello, Panda!".into(), None),
1658 ("username", "penguin".into(), None),
1659 ("timestamp", 1687265969.into(), None),
1660 ],
1661 vec![
1662 ("message", "Oh, howdy, Pengi!".into(), None),
1663 ("username", "panda".into(), None),
1664 ("timestamp", 1687266014.into(), None),
1665 ],
1666 vec![
1667 ("message", "How are you?".into(), None),
1668 ("username", "panda".into(), None),
1669 ("timestamp", 1687266032.into(), None),
1670 ],
1671 vec![
1672 ("message", "I miss Pengolina. How about you?".into(), None),
1673 ("username", "penguin".into(), None),
1674 ("timestamp", 1687266055.into(), None),
1675 ],
1676 vec![
1677 ("message", "I am cute and very hungry".into(), None),
1678 ("username", "panda".into(), None),
1679 ("timestamp", 1687266141.into(), None),
1680 ],
1681 vec![
1682 ("message", "(°◇°) !!".into(), None),
1683 ("username", "penguin".into(), None),
1684 ("timestamp", 1687266160.into(), None),
1685 ],
1686 ],
1687 key_pair,
1688 )
1689 .await
1690 }
1691
1692 #[rstest]
1693 #[case::order_by_date_asc(
1694 Query::new(
1695 &Pagination::default(),
1696 &Select::new(&["title".into()]),
1697 &Filter::new(),
1698 &Order::new(&"date".into(), &Direction::Ascending),
1699 ),
1700 "title".into(),
1701 vec![
1702 "The Pandadoodle Flute Trio".into(),
1703 "Kids Bits! Chiptune for baby squirrels".into(),
1704 "Eventual Consistent Grapefruit".into(),
1705 "Shoebill - Non-migratory Shoegaze".into(),
1706 "Bamboo-Scrumble Rumba Night - Xmas special".into(),
1707 ],
1708 )]
1709 #[case::order_by_date_desc(
1710 Query::new(
1711 &Pagination::default(),
1712 &Select::new(&["date".into()]),
1713 &Filter::new(),
1714 &Order::new(&"date".into(), &Direction::Descending),
1715 ),
1716 "date".into(),
1717 vec![
1718 "2023-12-20".into(),
1719 "2023-09-09".into(),
1720 "2023-05-02".into(),
1721 "2023-04-17".into(),
1722 "2023-04-14".into(),
1723 ],
1724 )]
1725 #[case::filter_by_ticket_price_range(
1726 Query::new(
1727 &Pagination::default(),
1728 &Select::new(&["ticket_price".into()]),
1729 &Filter::new().fields(&[
1730 ("ticket_price_gt", &[10.0.into()]),
1731 ("ticket_price_lt", &[50.0.into()]),
1732 ]),
1733 &Order::new(&"ticket_price".into(), &Direction::Ascending),
1734 ),
1735 "ticket_price".into(),
1736 vec![
1737 12.5.into(),
1738 24.99.into(),
1739 ],
1740 )]
1741 #[case::filter_by_search_string(
1742 Query::new(
1743 &Pagination::default(),
1744 &Select::new(&["title".into()]),
1745 &Filter::new().fields(&[
1746 ("title_contains", &["baby".into()]),
1747 ]),
1748 &Order::default(),
1749 ),
1750 "title".into(),
1751 vec![
1752 "Kids Bits! Chiptune for baby squirrels".into(),
1753 ],
1754 )]
1755 fn basic_queries(
1756 key_pair: KeyPair,
1757 #[case] args: Query<PaginationCursor>,
1758 #[case] selected_field: String,
1759 #[case] expected_fields: Vec<OperationValue>,
1760 ) {
1761 test_runner(|mut node: TestNode| async move {
1762 let (schema, _) = create_events_test_data(&mut node, &key_pair).await;
1763
1764 let (_, documents) = node
1765 .context
1766 .store
1767 .query(&schema, &args, None)
1768 .await
1769 .expect("Query failed");
1770
1771 assert_eq!(documents.len(), expected_fields.len());
1772
1773 for (index, expected_value) in expected_fields.into_iter().enumerate() {
1775 assert_eq!(
1776 get_document_value(&documents[index].1, &selected_field),
1777 expected_value
1778 );
1779 }
1780 });
1781 }
1782
1783 #[rstest]
1784 #[case::order_by_timestamp(
1785 Order::new(&"timestamp".into(), &Direction::Ascending),
1786 "message".into(),
1787 vec![
1788 "Hello, Panda!".into(),
1789 "Oh, howdy, Pengi!".into(),
1790 "How are you?".into(),
1791 "I miss Pengolina. How about you?".into(),
1792 "I am cute and very hungry".into(),
1793 "(°◇°) !!".into(),
1794 ],
1795 )]
1796 #[case::order_by_timestamp_descending(
1797 Order::new(&"timestamp".into(), &Direction::Descending),
1798 "message".into(),
1799 vec![
1800 "(°◇°) !!".into(),
1801 "I am cute and very hungry".into(),
1802 "I miss Pengolina. How about you?".into(),
1803 "How are you?".into(),
1804 "Oh, howdy, Pengi!".into(),
1805 "Hello, Panda!".into(),
1806 ],
1807 )]
1808 #[case::order_by_message(
1809 Order::new(&"message".into(), &Direction::Ascending),
1810 "message".into(),
1811 vec![
1812 "(°◇°) !!".into(),
1813 "Hello, Panda!".into(),
1814 "How are you?".into(),
1815 "I am cute and very hungry".into(),
1816 "I miss Pengolina. How about you?".into(),
1817 "Oh, howdy, Pengi!".into(),
1818 ],
1819 )]
1820 fn pagination_over_ordered_fields(
1821 key_pair: KeyPair,
1822 #[case] order: Order,
1823 #[case] selected_field: String,
1824 #[case] expected_fields: Vec<OperationValue>,
1825 ) {
1826 test_runner(|mut node: TestNode| async move {
1827 let (schema, view_ids) = create_chat_test_data(&mut node, &key_pair).await;
1828
1829 let mut cursor: Option<PaginationCursor> = None;
1830
1831 let mut args = Query::new(
1832 &Pagination::new(
1833 &NonZeroU64::new(1).unwrap(),
1834 cursor.as_ref(),
1835 &vec![
1836 PaginationField::TotalCount,
1837 PaginationField::EndCursor,
1838 PaginationField::HasNextPage,
1839 ],
1840 ),
1841 &Select::new(&[
1842 Field::Meta(MetaField::DocumentViewId),
1843 Field::Field("message".into()),
1844 Field::Field("username".into()),
1845 Field::Field("timestamp".into()),
1846 ]),
1847 &Filter::default(),
1848 &order,
1849 );
1850
1851 for (index, expected_field) in expected_fields.into_iter().enumerate() {
1853 args.pagination.after = cursor;
1854
1855 let (pagination_data, documents) = node
1856 .context
1857 .store
1858 .query(&schema, &args, None)
1859 .await
1860 .expect("Query failed");
1861
1862 match pagination_data.end_cursor {
1863 Some(end_cursor) => {
1864 cursor = Some(end_cursor);
1865 }
1866 None => panic!("Expected cursor"),
1867 }
1868
1869 if view_ids.len() - 1 == index {
1870 assert!(!pagination_data.has_next_page);
1871 } else {
1872 assert!(pagination_data.has_next_page);
1873 }
1874
1875 assert_eq!(pagination_data.total_count, Some(view_ids.len() as u64));
1876 assert_eq!(documents.len(), 1);
1877 assert_eq!(documents[0].1.get(&selected_field), Some(&expected_field));
1878 assert_eq!(cursor.as_ref(), Some(&documents[0].0));
1879 }
1880
1881 args.pagination.after = cursor;
1883
1884 let (pagination_data, documents) = node
1885 .context
1886 .store
1887 .query(&schema, &args, None)
1888 .await
1889 .expect("Query failed");
1890
1891 assert_eq!(pagination_data.total_count, Some(view_ids.len() as u64));
1892 assert_eq!(pagination_data.end_cursor, None);
1893 assert!(!pagination_data.has_next_page);
1894 assert_eq!(documents.len(), 0);
1895 });
1896 }
1897
1898 #[rstest]
1899 fn pagination_over_ordered_view_ids(key_pair: KeyPair) {
1900 test_runner(|mut node: TestNode| async move {
1901 let (schema, mut view_ids) = create_events_test_data(&mut node, &key_pair).await;
1902 let view_ids_len = view_ids.len();
1903
1904 view_ids.sort();
1907
1908 let mut cursor: Option<PaginationCursor> = None;
1909
1910 let mut args = Query::new(
1911 &Pagination::new(
1912 &NonZeroU64::new(1).unwrap(),
1913 cursor.as_ref(),
1914 &vec![
1915 PaginationField::TotalCount,
1916 PaginationField::EndCursor,
1917 PaginationField::HasNextPage,
1918 ],
1919 ),
1920 &Select::new(&[Field::Meta(MetaField::DocumentViewId)]),
1921 &Filter::default(),
1922 &Order::new(
1923 &Field::Meta(MetaField::DocumentViewId),
1924 &Direction::Ascending,
1925 ),
1926 );
1927
1928 for (index, view_id) in view_ids.into_iter().enumerate() {
1930 args.pagination.after = cursor;
1931
1932 let (pagination_data, documents) = node
1933 .context
1934 .store
1935 .query(&schema, &args, None)
1936 .await
1937 .expect("Query failed");
1938
1939 match pagination_data.end_cursor {
1940 Some(end_cursor) => {
1941 cursor = Some(end_cursor);
1942 }
1943 None => panic!("Expected cursor"),
1944 }
1945
1946 if view_ids_len - 1 == index {
1947 assert!(!pagination_data.has_next_page);
1948 } else {
1949 assert!(pagination_data.has_next_page);
1950 }
1951
1952 assert_eq!(pagination_data.total_count, Some(5));
1953 assert_eq!(documents.len(), 1);
1954 assert_eq!(documents[0].1.view_id, view_id);
1955 assert_eq!(cursor.as_ref(), Some(&documents[0].0));
1956 }
1957
1958 args.pagination.after = cursor;
1960
1961 let (pagination_data, documents) = node
1962 .context
1963 .store
1964 .query(&schema, &args, None)
1965 .await
1966 .expect("Query failed");
1967
1968 assert_eq!(pagination_data.total_count, Some(5));
1969 assert_eq!(pagination_data.end_cursor, None);
1970 assert!(!pagination_data.has_next_page);
1971 assert_eq!(documents.len(), 0);
1972 });
1973 }
1974
1975 #[rstest]
1976 fn pinned_relation_list(key_pair: KeyPair) {
1977 test_runner(|mut node: TestNode| async move {
1978 let (venues_schema, venues_view_ids) =
1979 create_venues_test_data(&mut node, &key_pair).await;
1980
1981 let (_, visited_view_ids) = create_visited_test_data(
1982 &mut node,
1983 venues_view_ids,
1984 venues_schema.to_owned(),
1985 &key_pair,
1986 )
1987 .await;
1988
1989 let args = Query::new(
1990 &Pagination::new(
1991 &NonZeroU64::new(10).unwrap(),
1992 None,
1993 &vec![
1994 PaginationField::TotalCount,
1995 PaginationField::EndCursor,
1996 PaginationField::HasNextPage,
1997 ],
1998 ),
1999 &Select::new(&["name".into()]),
2000 &Filter::default(),
2001 &Order::default(),
2002 );
2003
2004 let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2006
2007 let (pagination_data, documents) = node
2008 .context
2009 .store
2010 .query(&venues_schema, &args, Some(&list))
2011 .await
2012 .expect("Query failed");
2013
2014 assert_eq!(documents.len(), 7);
2015 assert_eq!(pagination_data.total_count, Some(7));
2016 assert_eq!(
2017 get_document_value(&documents[0].1, "name"),
2018 OperationValue::String("World Wide Feld".to_string())
2019 );
2020 assert_eq!(
2021 get_document_value(&documents[4].1, "name"),
2022 OperationValue::String("World Wide Feld".to_string())
2023 );
2024
2025 let list = RelationList::new_pinned(&visited_view_ids[1], "venues");
2027
2028 let (pagination_data, documents) = node
2029 .context
2030 .store
2031 .query(&venues_schema, &args, Some(&list))
2032 .await
2033 .expect("Query failed");
2034
2035 assert_eq!(documents.len(), 3);
2036 assert_eq!(pagination_data.total_count, Some(3));
2037 assert_eq!(
2038 get_document_value(&documents[0].1, "name"),
2039 OperationValue::String("Internet Explorer".to_string())
2040 );
2041 assert_eq!(
2042 get_document_value(&documents[1].1, "name"),
2043 OperationValue::String("Internet Explorer".to_string())
2044 );
2045 assert_eq!(
2046 get_document_value(&documents[2].1, "name"),
2047 OperationValue::String("p4p space".to_string())
2048 );
2049 });
2050 }
2051
2052 #[rstest]
2053 fn empty_pinned_relation_list(key_pair: KeyPair) {
2054 test_runner(|mut node: TestNode| async move {
2055 let (venues_schema, _) = create_venues_test_data(&mut node, &key_pair).await;
2056
2057 let visited_schema = add_schema(
2058 &mut node,
2059 "visited",
2060 vec![(
2061 "venues",
2062 FieldType::PinnedRelationList(venues_schema.id().clone()),
2063 )],
2064 &key_pair,
2065 )
2066 .await;
2067
2068 let visited_view_id = add_document(
2069 &mut node,
2070 visited_schema.id(),
2071 vec![(
2072 "venues",
2073 OperationValue::PinnedRelationList(PinnedRelationList::new(vec![])),
2074 )],
2075 &key_pair,
2076 )
2077 .await;
2078
2079 let args = Query::new(
2081 &Pagination::new(
2082 &NonZeroU64::new(10).unwrap(),
2083 None,
2084 &vec![
2085 PaginationField::TotalCount,
2086 PaginationField::EndCursor,
2087 PaginationField::HasNextPage,
2088 ],
2089 ),
2090 &Select::new(&[Field::Meta(MetaField::DocumentId)]),
2091 &Filter::default(),
2092 &Order::default(),
2093 );
2094
2095 let list = RelationList::new_pinned(&visited_view_id, "venues");
2097
2098 let (_, documents) = node
2099 .context
2100 .store
2101 .query(&venues_schema, &args, Some(&list))
2102 .await
2103 .expect("Query failed");
2104
2105 assert!(documents.is_empty());
2106
2107 let args = Query::new(
2109 &Pagination::new(
2110 &NonZeroU64::new(10).unwrap(),
2111 None,
2112 &vec![
2113 PaginationField::TotalCount,
2114 PaginationField::EndCursor,
2115 PaginationField::HasNextPage,
2116 ],
2117 ),
2118 &Select::new(&["name".into()]),
2119 &Filter::default(),
2120 &Order::default(),
2121 );
2122
2123 let list = RelationList::new_pinned(&visited_view_id, "venues");
2125
2126 let (_, documents) = node
2127 .context
2128 .store
2129 .query(&venues_schema, &args, Some(&list))
2130 .await
2131 .expect("Query failed");
2132
2133 assert!(documents.is_empty());
2134
2135 let args = Query::new(
2137 &Pagination::new(
2138 &NonZeroU64::new(10).unwrap(),
2139 None,
2140 &vec![
2141 PaginationField::TotalCount,
2142 PaginationField::EndCursor,
2143 PaginationField::HasNextPage,
2144 ],
2145 ),
2146 &Select::new(&["venues".into()]),
2147 &Filter::default(),
2148 &Order::default(),
2149 );
2150
2151 node.context
2152 .store
2153 .query(&visited_schema, &args, None)
2154 .await
2155 .expect("Query failed");
2156 });
2157 }
2158
2159 #[rstest]
2160 fn relation_list_pagination_over_ordered_view_ids(key_pair: KeyPair) {
2161 test_runner(|mut node: TestNode| async move {
2162 let (venues_schema, venues_view_ids) =
2163 create_venues_test_data(&mut node, &key_pair).await;
2164
2165 let (_, visited_view_ids) = create_visited_test_data(
2166 &mut node,
2167 venues_view_ids.clone(),
2168 venues_schema.to_owned(),
2169 &key_pair,
2170 )
2171 .await;
2172
2173 let mut view_ids = [
2174 venues_view_ids[0].clone(),
2175 venues_view_ids[0].clone(),
2176 venues_view_ids[1].clone(),
2177 venues_view_ids[2].clone(),
2178 venues_view_ids[0].clone(),
2179 venues_view_ids[0].clone(),
2180 venues_view_ids[1].clone(),
2181 ];
2182 let view_ids_len = view_ids.len();
2183
2184 view_ids.sort();
2187
2188 let mut cursor: Option<PaginationCursor> = None;
2189
2190 let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2192
2193 let mut args = Query::new(
2194 &Pagination::new(
2195 &NonZeroU64::new(1).unwrap(),
2196 cursor.as_ref(),
2197 &vec![
2198 PaginationField::TotalCount,
2199 PaginationField::EndCursor,
2200 PaginationField::HasNextPage,
2201 ],
2202 ),
2203 &Select::new(&[Field::Meta(MetaField::DocumentViewId)]),
2204 &Filter::default(),
2205 &Order::new(
2206 &Field::Meta(MetaField::DocumentViewId),
2207 &Direction::Ascending,
2208 ),
2209 );
2210
2211 for (index, view_id) in view_ids.iter().enumerate() {
2213 args.pagination.after = cursor;
2214
2215 let (pagination_data, documents) = node
2216 .context
2217 .store
2218 .query(&venues_schema, &args, Some(&list))
2219 .await
2220 .expect("Query failed");
2221
2222 match pagination_data.end_cursor {
2223 Some(end_cursor) => {
2224 cursor = Some(end_cursor);
2225 }
2226 None => panic!("Expected cursor"),
2227 }
2228
2229 if view_ids_len - 1 == index {
2230 assert!(!pagination_data.has_next_page);
2231 } else {
2232 assert!(pagination_data.has_next_page);
2233 }
2234
2235 assert_eq!(pagination_data.total_count, Some(7));
2236 assert_eq!(documents.len(), 1);
2237 assert_eq!(&documents[0].1.view_id, view_id);
2238 assert_eq!(cursor.as_ref(), Some(&documents[0].0));
2239 }
2240
2241 args.pagination.after = cursor;
2243
2244 let (pagination_data, documents) = node
2245 .context
2246 .store
2247 .query(&venues_schema, &args, Some(&list))
2248 .await
2249 .expect("Query failed");
2250
2251 assert_eq!(pagination_data.total_count, Some(7));
2252 assert_eq!(pagination_data.end_cursor, None);
2253 assert!(!pagination_data.has_next_page);
2254 assert_eq!(documents.len(), 0);
2255 });
2256 }
2257
2258 #[rstest]
2259 #[case::default(
2260 Filter::default(),
2261 Order::default(),
2262 vec![
2263 "World Wide Feld".to_string(),
2264 "World Wide Feld".to_string(),
2265 "Internet Explorer".to_string(),
2266 "p4p space".to_string(),
2267 "World Wide Feld".to_string(),
2268 "World Wide Feld".to_string(),
2269 "Internet Explorer".to_string(),
2270 ]
2271 )]
2272 #[case::order_by_name(
2273 Filter::default(),
2274 Order::new(&"name".into(), &Direction::Ascending),
2275 vec![
2276 "Internet Explorer".to_string(),
2277 "Internet Explorer".to_string(),
2278 "p4p space".to_string(),
2279 "World Wide Feld".to_string(),
2280 "World Wide Feld".to_string(),
2281 "World Wide Feld".to_string(),
2282 "World Wide Feld".to_string(),
2283 ]
2284 )]
2285 #[case::search_for_text(
2286 Filter::new().fields(&[("name_contains", &["p".into()])]),
2287 Order::default(),
2288 vec![
2289 "Internet Explorer".to_string(),
2290 "p4p space".to_string(),
2291 "Internet Explorer".to_string(),
2292 ]
2293 )]
2294 #[case::filter_and_order_by_name(
2295 Filter::new().fields(&[("name_in", &["World Wide Feld".into(), "Internet Explorer".into()])]),
2296 Order::new(&"name".into(), &Direction::Descending),
2297 vec![
2298 "World Wide Feld".to_string(),
2299 "World Wide Feld".to_string(),
2300 "World Wide Feld".to_string(),
2301 "World Wide Feld".to_string(),
2302 "Internet Explorer".to_string(),
2303 "Internet Explorer".to_string(),
2304 ]
2305 )]
2306 fn paginated_pinned_relation_list(
2307 key_pair: KeyPair,
2308 #[case] filter: Filter,
2309 #[case] order: Order,
2310 #[case] expected_venues: Vec<String>,
2311 ) {
2312 test_runner(|mut node: TestNode| async move {
2313 let (venues_schema, venues_view_ids) =
2314 create_venues_test_data(&mut node, &key_pair).await;
2315
2316 let (_, visited_view_ids) = create_visited_test_data(
2317 &mut node,
2318 venues_view_ids.clone(),
2319 venues_schema.clone(),
2320 &key_pair,
2321 )
2322 .await;
2323
2324 let documents_len = expected_venues.len() as u64;
2325
2326 let mut cursor: Option<PaginationCursor> = None;
2327
2328 let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2330
2331 let mut args = Query::new(
2332 &Pagination::new(
2333 &NonZeroU64::new(1).unwrap(),
2334 cursor.as_ref(),
2335 &vec![
2336 PaginationField::TotalCount,
2337 PaginationField::EndCursor,
2338 PaginationField::HasNextPage,
2339 ],
2340 ),
2341 &Select::new(&[
2342 Field::Meta(MetaField::DocumentViewId),
2343 Field::Meta(MetaField::Owner),
2344 "name".into(),
2345 ]),
2346 &filter,
2347 &order,
2348 );
2349
2350 for (index, expected_venue) in expected_venues.into_iter().enumerate() {
2352 args.pagination.after = cursor;
2353
2354 let (pagination_data, documents) = node
2355 .context
2356 .store
2357 .query(&venues_schema, &args, Some(&list))
2358 .await
2359 .expect("Query failed");
2360
2361 match pagination_data.end_cursor {
2363 Some(end_cursor) => {
2364 cursor = Some(end_cursor);
2365 }
2366 None => panic!("Expected cursor"),
2367 }
2368
2369 if documents_len - 1 == index as u64 {
2371 assert!(!pagination_data.has_next_page);
2372 } else {
2373 assert!(pagination_data.has_next_page);
2374 }
2375
2376 assert_eq!(pagination_data.total_count, Some(documents_len));
2378 assert_eq!(cursor.as_ref(), Some(&documents[0].0));
2379
2380 assert_eq!(documents[0].1.author(), &key_pair.public_key());
2382 assert_eq!(documents.len(), 1);
2383 assert_eq!(
2384 get_document_value(&documents[0].1, "name"),
2385 expected_venue.into()
2386 );
2387 }
2388
2389 args.pagination.after = cursor;
2391
2392 let (pagination_data, documents) = node
2393 .context
2394 .store
2395 .query(&venues_schema, &args, Some(&list))
2396 .await
2397 .expect("Query failed");
2398
2399 assert!(!pagination_data.has_next_page);
2400 assert_eq!(pagination_data.total_count, Some(documents_len));
2401 assert_eq!(pagination_data.end_cursor, None);
2402 assert_eq!(documents.len(), 0);
2403 });
2404 }
2405
2406 #[rstest]
2407 #[case::default(Filter::default(), 3)]
2408 #[case::filtered(Filter::new().fields(&[("name_contains", &["Internet".into()])]), 1)]
2409 #[case::no_results(Filter::new().fields(&[("name", &["doesnotexist".into()])]), 0)]
2410 fn count(#[case] filter: Filter, #[case] expected_result: u64, key_pair: KeyPair) {
2411 test_runner(move |mut node: TestNode| async move {
2412 let (venues_schema, _) = create_venues_test_data(&mut node, &key_pair).await;
2413
2414 let args = Query::new(
2415 &Pagination::default(),
2416 &Select::default(),
2417 &filter,
2418 &Order::default(),
2419 );
2420
2421 let result = node
2422 .context
2423 .store
2424 .count(&venues_schema, &args, None)
2425 .await
2426 .unwrap();
2427
2428 assert_eq!(result, expected_result);
2429 });
2430 }
2431
2432 #[rstest]
2433 #[case::default(Filter::default(), 7)]
2434 #[case::filtered_1(Filter::new().fields(&[("name_contains", &["Internet".into()])]), 2)]
2435 #[case::filtered_2(Filter::new().fields(&[("name_not", &["World Wide Feld".into()])]), 3)]
2436 #[case::no_results(Filter::new().fields(&[("name", &["doesnotexist".into()])]), 0)]
2437 fn count_relation_list(
2438 #[case] filter: Filter,
2439 #[case] expected_result: u64,
2440 key_pair: KeyPair,
2441 ) {
2442 test_runner(move |mut node: TestNode| async move {
2443 let (venues_schema, venues_view_ids) =
2444 create_venues_test_data(&mut node, &key_pair).await;
2445
2446 let (_, visited_view_ids) = create_visited_test_data(
2447 &mut node,
2448 venues_view_ids.clone(),
2449 venues_schema.clone(),
2450 &key_pair,
2451 )
2452 .await;
2453
2454 let args = Query::new(
2455 &Pagination::default(),
2456 &Select::default(),
2457 &filter,
2458 &Order::default(),
2459 );
2460
2461 let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2463
2464 let result = node
2465 .context
2466 .store
2467 .count(&venues_schema, &args, Some(&list))
2468 .await
2469 .unwrap();
2470
2471 assert_eq!(result, expected_result);
2472 });
2473 }
2474
2475 #[rstest]
2476 fn total_count_of_document_with_relation_list_field(key_pair: KeyPair) {
2477 test_runner(|mut node: TestNode| async move {
2478 let (venues_schema, venues_view_ids) =
2479 create_venues_test_data(&mut node, &key_pair).await;
2480
2481 let (visited_schema, _) = create_visited_test_data(
2482 &mut node,
2483 venues_view_ids,
2484 venues_schema.to_owned(),
2485 &key_pair,
2486 )
2487 .await;
2488
2489 let args = Query::new(
2490 &Pagination::new(
2491 &NonZeroU64::new(25).unwrap(),
2492 None,
2493 &vec![PaginationField::TotalCount],
2494 ),
2495 &Select::new(&[Field::Meta(MetaField::DocumentId)]),
2496 &Filter::default(),
2497 &Order::default(),
2498 );
2499
2500 let (pagination_data, documents) = node
2501 .context
2502 .store
2503 .query(&visited_schema, &args, None)
2504 .await
2505 .expect("Query failed");
2506
2507 assert_eq!(documents.len(), 2);
2508 assert_eq!(pagination_data.total_count, Some(2));
2509 });
2510 }
2511
2512 #[rstest]
2513 fn select_cursor_during_conversion(schema_id: SchemaId) {
2514 let relation_list_hash = Hash::new_from_bytes(&[0]).to_string();
2515 let first_document_hash = Hash::new_from_bytes(&[1]).to_string();
2516 let second_document_hash = Hash::new_from_bytes(&[2]).to_string();
2517
2518 let root_cursor_1 = Hash::new_from_bytes(&[0, 1]).to_string();
2519 let root_cursor_2 = Hash::new_from_bytes(&[0, 2]).to_string();
2520 let cursor_1 = Hash::new_from_bytes(&[0, 3]).to_string();
2521 let cursor_2 = Hash::new_from_bytes(&[0, 4]).to_string();
2522 let cursor_3 = Hash::new_from_bytes(&[0, 5]).to_string();
2523 let cursor_4 = Hash::new_from_bytes(&[0, 6]).to_string();
2524
2525 let query_rows = vec![
2526 QueryRow {
2529 document_id: first_document_hash.clone(),
2530 document_view_id: first_document_hash.clone(),
2531 operation_id: first_document_hash.clone(),
2532 is_deleted: false,
2533 is_edited: false,
2534 root_cursor: root_cursor_1.clone(),
2537 cmp_value_cursor: cursor_1.clone(), owner: OptionalOwner::default(),
2543 name: "username".to_string(),
2544 value: Some("panda".to_string()),
2545 field_type: "str".to_string(),
2546 list_index: 0,
2547 },
2548 QueryRow {
2549 document_id: first_document_hash.clone(),
2550 document_view_id: first_document_hash.clone(),
2551 operation_id: first_document_hash.clone(),
2552 is_deleted: false,
2553 is_edited: false,
2554 root_cursor: root_cursor_1.clone(),
2555 cmp_value_cursor: cursor_2.clone(), owner: OptionalOwner::default(),
2557 name: "is_admin".to_string(),
2558 value: Some("false".to_string()),
2559 field_type: "bool".to_string(),
2560 list_index: 0,
2561 },
2562 QueryRow {
2565 document_id: second_document_hash.clone(),
2566 document_view_id: second_document_hash.clone(),
2567 operation_id: second_document_hash.clone(),
2568 is_deleted: false,
2569 is_edited: false,
2570 root_cursor: root_cursor_2.clone(),
2571 cmp_value_cursor: cursor_3.clone(), owner: OptionalOwner::default(),
2573 name: "username".to_string(),
2574 value: Some("penguin".to_string()),
2575 field_type: "str".to_string(),
2576 list_index: 0,
2577 },
2578 QueryRow {
2579 document_id: second_document_hash.clone(),
2580 document_view_id: second_document_hash.clone(),
2581 operation_id: second_document_hash.clone(),
2582 is_deleted: false,
2583 is_edited: false,
2584 root_cursor: root_cursor_2.clone(),
2585 cmp_value_cursor: cursor_4.clone(), owner: OptionalOwner::default(),
2587 name: "is_admin".to_string(),
2588 value: Some("true".to_string()),
2589 field_type: "bool".to_string(),
2590 list_index: 0,
2591 },
2592 ];
2593
2594 let result = convert_rows(
2600 query_rows.clone(),
2601 Some(&RelationList::new_unpinned(
2602 &relation_list_hash.parse().unwrap(),
2603 "relation_list_field",
2604 )),
2605 &vec!["username".to_string(), "is_admin".to_string()],
2606 &schema_id,
2607 );
2608
2609 assert_eq!(result.len(), 2);
2610
2611 assert_eq!(
2614 result[0].0,
2615 PaginationCursor::new(
2616 OperationCursor::from(cursor_2.as_str()),
2617 Some(OperationCursor::from(root_cursor_1.as_str())),
2618 Some(relation_list_hash.parse().unwrap())
2619 )
2620 );
2621 assert_eq!(
2622 result[1].0,
2623 PaginationCursor::new(
2624 OperationCursor::from(cursor_4.as_str()),
2625 Some(OperationCursor::from(root_cursor_2.as_str())),
2626 Some(relation_list_hash.parse().unwrap())
2627 )
2628 );
2629
2630 let result = convert_rows(
2634 query_rows,
2635 None,
2636 &vec!["username".to_string(), "is_admin".to_string()],
2637 &schema_id,
2638 );
2639
2640 assert_eq!(result.len(), 2);
2641 assert_eq!(
2642 result[0].0,
2643 PaginationCursor::new(OperationCursor::from(cursor_2.as_str()), None, None)
2644 );
2645 assert_eq!(
2646 result[1].0,
2647 PaginationCursor::new(OperationCursor::from(cursor_4.as_str()), None, None)
2648 );
2649 }
2650
2651 #[rstest]
2652 fn query_updated_documents_with_filter(
2653 #[from(populate_store_config)]
2654 #[with(2, 10, vec![KeyPair::new()], false, doggo_schema(), doggo_fields(),
2657 vec![("username", OperationValue::String("me".to_string()))]
2658 )]
2659 config: PopulateStoreConfig,
2660 ) {
2661 test_runner(|mut node: TestNode| async move {
2662 populate_and_materialize(&mut node, &config).await;
2664
2665 let schema = doggo_schema();
2666
2667 let documents = node
2668 .context
2669 .store
2670 .get_documents_by_schema(schema.id())
2671 .await
2672 .unwrap();
2673 assert_eq!(documents.len(), 10);
2674 for document in documents {
2675 if document.get("username").unwrap() != &OperationValue::String("me".into()) {
2676 panic!("All 'username' fields should have been updated to 'me'");
2677 }
2678 }
2679
2680 let args = Query::new(
2681 &Pagination::new(
2682 &NonZeroU64::new(5).unwrap(),
2683 None,
2684 &vec![PaginationField::TotalCount],
2685 ),
2686 &Select::new(&[
2687 Field::Meta(MetaField::DocumentId),
2688 Field::Field("username".into()),
2689 Field::Field("height".into()),
2690 Field::Field("age".into()),
2691 Field::Field("is_admin".into()),
2692 ]),
2693 &Filter::default().fields(&[("username", &["me".into()])]),
2694 &Order::default(),
2695 );
2696
2697 let (_pagination_data, documents) = node
2698 .context
2699 .store
2700 .query(&schema, &args, None)
2701 .await
2702 .expect("Query failed");
2703
2704 assert_eq!(documents.len(), 5);
2706 for (_cursor, document) in documents {
2707 assert_eq!(document.fields().unwrap().len(), 4);
2708 assert!(document.is_edited());
2709 }
2710 });
2711 }
2712}