aquadoggo/db/stores/
query.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2
3//! This module offers a query API to find many p2panda documents, filtered or sorted by custom
4//! parameters. The results are batched via cursor-based pagination.
5use std::collections::HashMap;
6use std::fmt::Display;
7use std::str::FromStr;
8
9use anyhow::bail;
10use p2panda_rs::document::DocumentViewId;
11use p2panda_rs::operation::OperationValue;
12use p2panda_rs::schema::{FieldName, Schema, SchemaId};
13use p2panda_rs::storage_provider::error::DocumentStorageError;
14use sqlx::query::QueryAs;
15use sqlx::query_as;
16
17use crate::db::models::utils::parse_document_view_field_rows;
18use crate::db::models::{DocumentViewFieldRow, QueryRow};
19use crate::db::query::{
20    ApplicationFields, Cursor, Direction, Field, Filter, FilterBy, FilterSetting, LowerBound,
21    MetaField, Order, Pagination, PaginationField, Select, UpperBound,
22};
23use crate::db::stores::OperationCursor;
24use crate::db::types::StorageDocument;
25use crate::db::{Pool, SqlStore};
26
27/// Configure query to select documents based on a relation list field.
28#[derive(Debug)]
29pub struct RelationList {
30    /// View id of document which holds relation list field.
31    pub root_view_id: DocumentViewId,
32
33    /// Field which contains the relation list values.
34    pub field: FieldName,
35
36    /// Type of relation list.
37    pub list_type: RelationListType,
38}
39
40#[derive(Debug)]
41pub enum RelationListType {
42    Pinned,
43    Unpinned,
44}
45
46impl RelationList {
47    pub fn new_unpinned(root_view_id: &DocumentViewId, field: &str) -> Self {
48        Self {
49            root_view_id: root_view_id.to_owned(),
50            field: field.to_string(),
51            list_type: RelationListType::Unpinned,
52        }
53    }
54
55    pub fn new_pinned(root_view_id: &DocumentViewId, field: &str) -> Self {
56        Self {
57            root_view_id: root_view_id.to_owned(),
58            field: field.to_string(),
59            list_type: RelationListType::Pinned,
60        }
61    }
62}
63
64/// Cursor aiding pagination, represented as a base58-encoded string.
65///
66/// The encoding ensures that the cursor stays "opaque", API consumers to not read any further
67/// semantic meaning into it, even though we keep some crucial information in it which help us
68/// internally during pagination.
69#[derive(Debug, Clone, Eq, PartialEq)]
70pub struct PaginationCursor {
71    /// Unique identifier aiding us to determine the current row.
72    pub operation_cursor: OperationCursor,
73
74    /// Unique identifier aiding us to determine the row of the parent document's relation list.
75    pub root_operation_cursor: Option<OperationCursor>,
76
77    /// In relation list queries we use this field to represent the parent document holding that
78    /// list.
79    pub root_view_id: Option<DocumentViewId>,
80}
81
82impl PaginationCursor {
83    pub fn new(
84        operation_cursor: OperationCursor,
85        root_operation_cursor: Option<OperationCursor>,
86        root_view_id: Option<DocumentViewId>,
87    ) -> Self {
88        assert!(root_operation_cursor.is_some() == root_view_id.is_some());
89
90        Self {
91            operation_cursor,
92            root_operation_cursor,
93            root_view_id,
94        }
95    }
96}
97
98impl Display for PaginationCursor {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "{}", self.encode())
101    }
102}
103
104// This should _not_ be an underscore character since we're also parsing document view ids in the
105// string which contain that character already
106const CURSOR_SEPARATOR: char = '-';
107
108impl Cursor for PaginationCursor {
109    type Error = anyhow::Error;
110
111    fn decode(encoded: &str) -> Result<Self, Self::Error> {
112        let bytes = bs58::decode(encoded).into_vec()?;
113        let decoded = std::str::from_utf8(&bytes)?;
114
115        let parts: Vec<&str> = decoded.split(CURSOR_SEPARATOR).collect();
116        match parts.len() {
117            1 => Ok(Self::new(OperationCursor::from(parts[0]), None, None)),
118            3 => Ok(Self::new(
119                OperationCursor::from(parts[0]),
120                Some(OperationCursor::from(parts[1])),
121                Some(DocumentViewId::from_str(parts[2])?),
122            )),
123            _ => {
124                bail!("Invalid amount of cursor parts");
125            }
126        }
127    }
128
129    fn encode(&self) -> String {
130        bs58::encode(
131            format!(
132                "{}{}",
133                self.operation_cursor,
134                self.root_view_id
135                    .as_ref()
136                    .map_or("".to_string(), |view_id| format!(
137                        "{}{}{}{}",
138                        CURSOR_SEPARATOR,
139                        self.root_operation_cursor
140                            .as_ref()
141                            .expect("Expect root_operation to be set when root_view_id is as well"),
142                        CURSOR_SEPARATOR,
143                        view_id
144                    ))
145            )
146            .as_bytes(),
147        )
148        .into_string()
149    }
150}
151
152#[derive(Default, Clone, Debug)]
153pub struct PaginationData<C>
154where
155    C: Cursor,
156{
157    /// Number of all documents in queried collection.
158    pub total_count: Option<u64>,
159
160    /// Flag indicating if `endCursor` will return another page.
161    pub has_next_page: bool,
162
163    /// Flag indicating if `startCursor` will return another page.
164    pub has_previous_page: bool,
165
166    /// Cursor which can be used to paginate backwards.
167    pub start_cursor: Option<C>,
168
169    /// Cursor which can be used to paginate forwards.
170    pub end_cursor: Option<C>,
171}
172
173pub type QueryResponse = (
174    PaginationData<PaginationCursor>,
175    Vec<(PaginationCursor, StorageDocument)>,
176);
177
178/// Query configuration to determine pagination cursor, selected fields, filters and order of
179/// results.
180#[derive(Debug, Clone)]
181pub struct Query<C>
182where
183    C: Cursor,
184{
185    pub pagination: Pagination<C>,
186    pub select: Select,
187    pub filter: Filter,
188    pub order: Order,
189}
190
191impl<C> Query<C>
192where
193    C: Cursor,
194{
195    pub fn new(
196        pagination: &Pagination<C>,
197        select: &Select,
198        filter: &Filter,
199        order: &Order,
200    ) -> Self {
201        Self {
202            pagination: pagination.clone(),
203            select: select.clone(),
204            filter: filter.clone(),
205            order: order.clone(),
206        }
207    }
208}
209
210/// Helper method to determine the field type of the given field by looking at the schema and
211/// derive a SQL type cast function from it.
212fn typecast_field_sql(
213    sql_field: &str,
214    field_name: &str,
215    schema: &Schema,
216    case_sensitive: bool,
217) -> String {
218    let field_type = schema
219        .fields()
220        .iter()
221        .find_map(|(schema_field_name, field_type)| {
222            if schema_field_name == field_name {
223                Some(field_type)
224            } else {
225                None
226            }
227        })
228        // We expect that at this stage we only deal with fields which really exist in the schema,
229        // everything has been validated before
230        .unwrap_or_else(|| panic!("Field '{}' not given in Schema", field_name));
231
232    match field_type {
233        p2panda_rs::schema::FieldType::Integer => {
234            format!("CAST ({sql_field} AS INTEGER)")
235        }
236        p2panda_rs::schema::FieldType::Float => {
237            format!("CAST ({sql_field} AS REAL)")
238        }
239        // All other types (booleans, relations, etc.) we keep as strings. We can not convert
240        // booleans easily as they don't have their own datatype in SQLite
241        _ => {
242            if case_sensitive {
243                sql_field.to_string()
244            } else {
245                format!("LOWER({sql_field})")
246            }
247        }
248    }
249}
250
251/// Values to bind to SQL query.
252#[derive(Debug)]
253enum BindArgument {
254    String(String),
255    Integer(i64),
256    Float(f64),
257}
258
259/// Helper method to bind untrusted arguments to a sqlx `QueryAs` instance.
260fn bind_to_query<'q, O>(
261    mut query: QueryAs<'q, sqlx::Any, O, sqlx::any::AnyArguments<'q>>,
262    args: &'q Vec<BindArgument>,
263) -> QueryAs<'q, sqlx::Any, O, sqlx::any::AnyArguments<'q>> {
264    for arg in args {
265        query = match arg {
266            BindArgument::String(value) => query.bind(value),
267            BindArgument::Integer(value) => query.bind(value),
268            BindArgument::Float(value) => query.bind(value),
269        };
270    }
271
272    query
273}
274
275/// Helper method to convert operation values into a bindable argument representation for SQL.
276fn bind_arg(value: &OperationValue) -> Vec<BindArgument> {
277    match &value {
278        // Note that we wrap boolean values into a string here, as our operation field values are
279        // stored as strings in themselves and we can't typecast them to booleans due to a
280        // limitation in SQLite (see typecast_field_sql method).
281        //
282        // When comparing meta fields like `is_edited` etc. do _not_ use this method since we're
283        // dealing there with native boolean values instead of strings.
284        OperationValue::Boolean(value) => vec![BindArgument::String(
285            (if *value { "true" } else { "false" }).to_string(),
286        )],
287        OperationValue::Integer(value) => vec![BindArgument::Integer(value.to_owned())],
288        OperationValue::Float(value) => vec![BindArgument::Float(value.to_owned())],
289        OperationValue::String(value) => vec![BindArgument::String(value.to_owned())],
290        OperationValue::Relation(value) => {
291            vec![BindArgument::String(value.document_id().to_string())]
292        }
293        OperationValue::RelationList(value) => value
294            .iter()
295            .map(|document_id| BindArgument::String(document_id.to_string()))
296            .collect(),
297        OperationValue::PinnedRelation(value) => {
298            vec![BindArgument::String(value.view_id().to_string())]
299        }
300        OperationValue::PinnedRelationList(value) => value
301            .iter()
302            .map(|view_id| BindArgument::String(view_id.to_string()))
303            .collect(),
304        OperationValue::Bytes(value) => vec![BindArgument::String(hex::encode(value))],
305    }
306}
307
308/// Helper method to convert filter settings into SQL comparison operations.
309///
310/// Returns the SQL comparison string and adds binding arguments to the mutable array we pass into
311/// this function.
312fn cmp_sql(
313    sql_field: &str,
314    filter_setting: &FilterSetting,
315    args: &mut Vec<BindArgument>,
316) -> String {
317    match &filter_setting.by {
318        FilterBy::Element(value) => {
319            args.append(&mut bind_arg(value));
320
321            if !filter_setting.exclusive {
322                format!("{sql_field} = ${}", args.len())
323            } else {
324                format!("{sql_field} != ${}", args.len())
325            }
326        }
327        FilterBy::Set(values_vec) => {
328            let args_sql = values_vec
329                .iter()
330                .map(|value| {
331                    args.append(&mut bind_arg(value));
332                    format!("${}", args.len())
333                })
334                .collect::<Vec<String>>()
335                .join(",");
336
337            if !filter_setting.exclusive {
338                format!("{sql_field} IN ({})", args_sql)
339            } else {
340                format!("{sql_field} NOT IN ({})", args_sql)
341            }
342        }
343        FilterBy::Interval(lower_value, upper_value) => {
344            let mut values: Vec<String> = Vec::new();
345
346            match lower_value {
347                LowerBound::Unbounded => (),
348                LowerBound::Greater(value) => {
349                    args.append(&mut bind_arg(value));
350                    values.push(format!("{sql_field} > ${}", args.len()));
351                }
352                LowerBound::GreaterEqual(value) => {
353                    args.append(&mut bind_arg(value));
354                    values.push(format!("{sql_field} >= ${}", args.len()));
355                }
356            }
357
358            match upper_value {
359                UpperBound::Unbounded => (),
360                UpperBound::Lower(value) => {
361                    args.append(&mut bind_arg(value));
362                    values.push(format!("{sql_field} < ${}", args.len()));
363                }
364                UpperBound::LowerEqual(value) => {
365                    args.append(&mut bind_arg(value));
366                    values.push(format!("{sql_field} <= ${}", args.len()));
367                }
368            }
369
370            values.join(" AND ")
371        }
372        FilterBy::Contains(OperationValue::String(value)) => {
373            args.push(BindArgument::String(format!("%{value}%")));
374
375            if !filter_setting.exclusive {
376                format!("{sql_field} LIKE ${}", args.len())
377            } else {
378                format!("{sql_field} NOT LIKE ${}", args.len())
379            }
380        }
381        _ => panic!("Unsupported filter"),
382    }
383}
384
385/// Helper method to join optional SQL strings into one, separated by a comma.
386fn concatenate_sql(items: &[Option<String>]) -> String {
387    items
388        .iter()
389        .filter_map(|item| item.as_ref().cloned())
390        .collect::<Vec<String>>()
391        .join(", ")
392}
393
394/// Returns SQL to filter documents.
395///
396/// Since filters are the only place which can contain untrusted user values we are building the
397/// SQL query with positional arguments and bind the values to them. This helps sanitization of all
398/// values and prevents potential SQL injection attacks.
399fn where_filter_sql(filter: &Filter, schema: &Schema) -> (String, Vec<BindArgument>) {
400    let mut args: Vec<BindArgument> = Vec::new();
401
402    let sql = filter
403        .iter()
404        .filter_map(|filter_setting| {
405            match &filter_setting.field {
406                Field::Meta(MetaField::Owner) => {
407                    let filter_cmp = cmp_sql("operations_v1.public_key", filter_setting, &mut args);
408
409                    Some(format!(
410                        r#"
411                        AND EXISTS (
412                            SELECT
413                                operations_v1.public_key
414                            FROM
415                                operations_v1
416                            WHERE
417                                operations_v1.operation_id = documents.document_id
418                                AND
419                                    {filter_cmp}
420                        )
421                        "#
422                    ))
423                }
424                Field::Meta(MetaField::Edited) => {
425                    if let FilterBy::Element(OperationValue::Boolean(filter_value)) =
426                        filter_setting.by
427                    {
428                        // Convert the boolean value manually here since we're dealing with native
429                        // boolean values instead of operation field value strings
430                        let edited_flag = if filter_value { "true" } else { "false" };
431                        Some(format!("AND is_edited = {edited_flag}"))
432                    } else {
433                        None
434                    }
435                }
436                Field::Meta(MetaField::Deleted) => {
437                    if let FilterBy::Element(OperationValue::Boolean(filter_value)) =
438                        filter_setting.by
439                    {
440                        // Convert the boolean value manually here since we're dealing with native
441                        // boolean values instead of operation field value strings
442                        let deleted_flag = if filter_value { "true" } else { "false" };
443                        Some(format!("AND documents.is_deleted = {deleted_flag}"))
444                    } else {
445                        None
446                    }
447                }
448                Field::Meta(MetaField::DocumentId) => Some(format!(
449                    "AND {}",
450                    cmp_sql("documents.document_id", filter_setting, &mut args)
451                )),
452                Field::Meta(MetaField::DocumentViewId) => Some(format!(
453                    "AND {}",
454                    cmp_sql("documents.document_view_id", filter_setting, &mut args)
455                )),
456                Field::Field(field_name) => {
457                    let field_sql = typecast_field_sql("operation_fields_v1.value", field_name, schema, true);
458                    let filter_cmp = cmp_sql(&field_sql, filter_setting, &mut args);
459
460                    Some(format!(
461                        r#"
462                        AND EXISTS (
463                            SELECT
464                                operation_fields_v1.value
465                            FROM
466                                document_view_fields AS document_view_fields_subquery
467                                JOIN operation_fields_v1
468                                    ON
469                                        document_view_fields_subquery.operation_id = operation_fields_v1.operation_id
470                                    AND
471                                        document_view_fields_subquery.name = operation_fields_v1.name
472                            WHERE
473                                -- Match document_view_fields of this subquery with the parent one
474                                document_view_fields.document_view_id = document_view_fields_subquery.document_view_id
475
476                                -- Check if this document view fullfils this filter
477                                AND operation_fields_v1.name = '{field_name}'
478                                AND
479                                    {filter_cmp}
480                                AND
481                                    operation_fields_v1.operation_id = document_view_fields_subquery.operation_id
482                        )
483                        "#
484                    ))
485                }
486            }
487        })
488        .collect::<Vec<String>>()
489        .join("\n");
490
491    (sql, args)
492}
493
494/// Generate SQL for cursor-based pagination.
495///
496/// Read more about cursor-based pagination here:
497/// https://brunoscheufler.com/blog/2022-01-01-paginating-large-ordered-datasets-with-cursor-based-pagination
498///
499/// ## Cursors
500///
501/// Our implementation follows the principle mentioned in that article, with a couple of
502/// specialities due to our SQL table layout:
503///
504/// * We don't have auto incrementing `id` but `cursor` fields
505/// * We can have duplicate, multiple document id and view id values since one row represents only
506/// one document field. A document can consist of many fields. So pagination over document id's or
507/// view id's is non-trivial and needs extra aid from our `cursor`
508/// * Cursors _always_ need to point at the _last_ field of each document. This is assured by the
509/// `convert_rows` method which returns that cursor to the client via the GraphQL response
510///
511/// ## Ordering
512///
513/// Pagination is strictly connected to the chosen ordering by the client of the results. We need
514/// to take the ordering into account to understand which "next page" to show.
515///
516/// ## Pre-Queries
517///
518/// This method is async as it does some smaller "pre" SQL queries before the "main" query. This is
519/// an optimization over the fact that cursors sometimes point at values which stay the same for
520/// each SQL sub-SELECT, so we just do this query once and pass the values over into the "main"
521/// query.
522async fn where_pagination_sql(
523    pool: &Pool,
524    bind_args: &mut Vec<BindArgument>,
525    pagination: &Pagination<PaginationCursor>,
526    fields: &ApplicationFields,
527    list: Option<&RelationList>,
528    schema: &Schema,
529    order: &Order,
530) -> Result<String, DocumentStorageError> {
531    // No pagination cursor was given
532    if pagination.after.is_none() {
533        return Ok("".to_string());
534    }
535
536    // Ignore pagination if we're in a relation list query and the cursor does not match the parent
537    // document view id
538    if let (Some(relation_list), Some(cursor)) = (list, pagination.after.as_ref()) {
539        if Some(&relation_list.root_view_id) != cursor.root_view_id.as_ref() {
540            return Ok("".to_string());
541        }
542    }
543
544    // Unwrap as we know now that a cursor exists at this point
545    let cursor = pagination
546        .after
547        .as_ref()
548        .expect("Expect cursor to be set at this point");
549    let operation_cursor = &cursor.operation_cursor;
550
551    let cursor_sql = match list {
552        Some(_) => {
553            let root_cursor = cursor
554                .root_operation_cursor
555                .as_ref()
556                .expect("Expect root_operation_cursor to be set when querying relation list");
557
558            format!("operation_fields_v1_list.cursor > '{root_cursor}'")
559        }
560        None => {
561            format!("operation_fields_v1.cursor > '{operation_cursor}'")
562        }
563    };
564
565    let cmp_direction = match order.direction {
566        Direction::Ascending => ">",
567        Direction::Descending => "<",
568    };
569
570    match &order.field {
571        // 1. No ordering has been chosen by the client
572        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
573        //
574        // We order by default which is either:
575        //
576        // a) Simply paginate over document view ids, from the view where the cursor points at
577        // b) If we're in a relation list, we paginate over the list index values, from where the
578        // root_cursor points at
579        None => match list {
580            None => {
581                // Collection of all documents following a certain schema id:
582                //
583                // -------------------------------------------------
584                // document_id | view_id | field_name | ... | cursor
585                // -------------------------------------------------
586                // 0x01        | 0x01    | username   | ... | 0xa0
587                // 0x01        | 0x01    | message    | ... | 0xc2   <--- Select
588                // 0x02        | 0x02    | username   | ... | 0x06
589                // 0x02        | 0x02    | message    | ... | 0x8b
590                // -------------------------------------------------
591                //
592                // -> Select document_view_id at cursor 0xc2
593                // -> Show results from document_view_id > 0x01
594                let cmp_value_pre = format!(
595                    r#"
596                    SELECT
597                        document_view_fields.document_view_id
598                    FROM
599                        operation_fields_v1
600                        JOIN document_view_fields
601                            ON operation_fields_v1.operation_id = document_view_fields.operation_id
602                    WHERE
603                        operation_fields_v1.cursor = '{operation_cursor}'
604                    LIMIT 1
605                    "#
606                );
607
608                // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same
609                // result
610                let document_view_id: (String,) = query_as(&cmp_value_pre)
611                    .fetch_one(pool)
612                    .await
613                    .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
614
615                Ok(format!(
616                    "AND documents.document_view_id > '{}'",
617                    document_view_id.0
618                ))
619            }
620            Some(_) => {
621                // All documents mentioned in a root document's relation list, note that duplicates
622                // are possible in relation lists:
623                //
624                // --------------------------------------------------||-------------------------
625                // Documents from relation list                      || Relation list
626                // --------------------------------------------------||-------------------------
627                // document_id | view_id | field_name | ... | cursor || list_index | root_cursor
628                // --------------------------------------------------||-------------------------
629                // 0x03        | 0x03    | username   | ... | 0x99   || 0          | 0x54  <--- Select
630                // 0x03        | 0x03    | message    | ... | 0xc2   || 0          | 0x54
631                // 0x01        | 0x01    | username   | ... | 0xcd   || 1          | 0x8a
632                // 0x01        | 0x01    | message    | ... | 0xea   || 1          | 0x8a
633                // --------------------------------------------------||-------------------------
634                //
635                // -> Select list_index of relation list at root_cursor 0x54
636                // -> Show results from list_index > 0
637                let root_cursor = cursor
638                    .root_operation_cursor
639                    .as_ref()
640                    .expect("Expect root_operation_cursor to be set when querying relation list");
641
642                let cmp_value_pre = format!(
643                    r#"
644                    -- When ordering is activated we need to compare against the value
645                    -- of the ordered field - but from the row where the cursor points at
646                    SELECT
647                        operation_fields_v1.list_index
648                    FROM
649                        operation_fields_v1
650                    WHERE
651                        operation_fields_v1.cursor = '{root_cursor}'
652                    "#
653                );
654
655                // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same
656                // result
657                let list_index: (i32,) = query_as(&cmp_value_pre)
658                    .fetch_one(pool)
659                    .await
660                    .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
661
662                // List indexes are always unique so we can simply just compare them like that
663                Ok(format!(
664                    "AND operation_fields_v1_list.list_index > {}",
665                    list_index.0
666                ))
667            }
668        },
669
670        // 2. Ordering over a meta field
671        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
672        //
673        // We select the meta data from the document the cursor points at and use it to paginate
674        // over.
675        Some(Field::Meta(meta_field)) => {
676            let (cmp_value_pre, cmp_field) = match meta_field {
677                MetaField::DocumentId => {
678                    // Select document_id of operation where the cursor points at
679                    let cmp_value = format!(
680                        r#"
681                        SELECT
682                            operations_v1.document_id
683                        FROM
684                            operation_fields_v1
685                            JOIN operations_v1
686                                ON operation_fields_v1.operation_id = operations_v1.operation_id
687                        WHERE
688                            operation_fields_v1.cursor = '{operation_cursor}'
689                        LIMIT 1
690                        "#
691                    );
692
693                    (cmp_value, "documents.document_id")
694                }
695                MetaField::DocumentViewId => {
696                    // Select document_view_id of operation where the cursor points at
697                    let cmp_value = format!(
698                        r#"
699                        SELECT
700                            document_view_fields.document_view_id
701                        FROM
702                            operation_fields_v1
703                            JOIN document_view_fields
704                                ON operation_fields_v1.operation_id = document_view_fields.operation_id
705                        WHERE
706                            operation_fields_v1.cursor = '{operation_cursor}'
707                        LIMIT 1
708                        "#
709                    );
710
711                    (cmp_value, "documents.document_view_id")
712                }
713                MetaField::Owner | MetaField::Edited | MetaField::Deleted => {
714                    // @TODO: See issue: https://github.com/p2panda/aquadoggo/issues/326
715                    todo!("Not implemented");
716                }
717            };
718
719            // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same
720            // result
721            let cmp_value: (String,) = query_as(&cmp_value_pre)
722                .fetch_one(pool)
723                .await
724                .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
725            let cmp_value = format!("'{}'", cmp_value.0);
726
727            if fields.is_empty() && list.is_none() {
728                // If:
729                //
730                // 1. No application fields have been selected by the client
731                // 2. We're not paginating over a relation list
732                //
733                // .. then we can do a simplification of the query, since the results were grouped
734                // by documents and we can be sure that for each row the document id and view are
735                // unique.
736                Ok(format!("AND {cmp_field} {cmp_direction} {cmp_value}"))
737            } else {
738                // Cursor-based pagination
739                Ok(format!(
740                    r#"
741                    AND (
742                        {cmp_field} {cmp_direction} {cmp_value}
743                        OR
744                        (
745                            {cmp_field} = {cmp_value}
746                            AND
747                                {cursor_sql}
748                        )
749                    )
750                    "#
751                ))
752            }
753        }
754
755        // 3. Ordering over an application field
756        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
757        //
758        // Cursors are always pointing at the last field of a document. In the following example
759        // this is the "message" field. Since we've ordered the results by "timestamp" we need to
760        // manually find out what this value is by tracing back the cursor to the document to that
761        // ordered field.
762        //
763        // Collection of all documents, ordered by "timestamp", following a certain schema id:
764        //
765        // -------------------------------------------------
766        // document_id | view_id | field_name | ... | cursor
767        // -------------------------------------------------
768        // 0x01        | 0x01    | username   | ... | 0xa0
769        // 0x01        | 0x01    | timestamp  | ... | 0x72 <-- Compare
770        // 0x01        | 0x01    | message    | ... | 0xc2 <-- Select
771        // 0x02        | 0x02    | username   | ... | 0x06
772        // 0x02        | 0x02    | timestamp  | ... | 0x8f
773        // 0x02        | 0x02    | message    | ... | 0x8b
774        // -------------------------------------------------
775        //
776        // -> Select document_view_id where cursor points at
777        // -> Find "timestamp" field of that document and use this value
778        // -> Show results from "timestamp" value > other "timestamp" values
779        Some(Field::Field(order_field_name)) => {
780            // Select the value we want to compare with from the document the cursor is pointing
781            // at. This is the value which we also order the whole results by
782            let cmp_value_pre = format!(
783                r#"
784                SELECT
785                    operation_fields_v1.value
786
787                FROM
788                    operation_fields_v1
789                    LEFT JOIN
790                        document_view_fields
791                        ON document_view_fields.operation_id = operation_fields_v1.operation_id
792
793                WHERE
794                    document_view_fields.document_view_id = (
795                        SELECT
796                            document_view_fields.document_view_id
797
798                        FROM
799                            operation_fields_v1
800                            LEFT JOIN
801                                document_view_fields
802                                ON document_view_fields.operation_id = operation_fields_v1.operation_id
803
804                        WHERE
805                            operation_fields_v1.cursor = '{operation_cursor}'
806
807                        LIMIT 1
808                    )
809                    AND operation_fields_v1.name = '{order_field_name}'
810
811                LIMIT 1
812                "#
813            );
814
815            // Make a "pre" SQL query to avoid duplicate sub SELECT's always returning the same
816            // result.
817            //
818            // The returned value is added to the bindable arguments array since this is untrusted
819            // user content.
820            let operation_fields_value: (String,) = query_as(&cmp_value_pre)
821                .fetch_one(pool)
822                .await
823                .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
824            bind_args.push(BindArgument::String(operation_fields_value.0));
825
826            // Necessary casting for operation values of different type
827            let cmp_field =
828                typecast_field_sql("operation_fields_v1.value", order_field_name, schema, false);
829
830            let bind_arg_marker = typecast_field_sql(
831                &format!("${}", bind_args.len()),
832                order_field_name,
833                schema,
834                false,
835            );
836
837            // Cursor-based pagination
838            Ok(format!(
839                r#"
840                AND EXISTS (
841                    SELECT
842                        operation_fields_v1.value
843                    FROM
844                        operation_fields_v1
845                        LEFT JOIN document_view_fields
846                            ON operation_fields_v1.operation_id = document_view_fields.operation_id
847                    WHERE
848                        operation_fields_v1.name = '{order_field_name}'
849                        AND
850                            document_view_fields.document_view_id = documents.document_view_id
851                        AND
852                            (
853                                {cmp_field} {cmp_direction} {bind_arg_marker}
854                                OR
855                                (
856                                    {cmp_field} = {bind_arg_marker}
857                                    AND
858                                        {cursor_sql}
859                                )
860                            )
861                )
862                "#
863            ))
864        }
865    }
866}
867
868fn order_sql(
869    order: &Order,
870    schema: &Schema,
871    list: Option<&RelationList>,
872    fields: &ApplicationFields,
873) -> String {
874    // Create custom ordering if query set one
875    let custom = order
876        .field
877        .as_ref()
878        .map(|field| match field {
879            Field::Meta(MetaField::DocumentId) => "documents.document_id".to_string(),
880            Field::Meta(MetaField::DocumentViewId) => "documents.document_view_id".to_string(),
881            Field::Meta(MetaField::Owner) => "owner".to_string(),
882            Field::Meta(MetaField::Edited) => "is_edited".to_string(),
883            Field::Meta(MetaField::Deleted) => "is_deleted".to_string(),
884            Field::Field(field_name) => {
885                format!(
886                    r#"
887                    (
888                        SELECT
889                            {}
890                        FROM
891                            operation_fields_v1
892                            LEFT JOIN document_view_fields
893                                ON operation_fields_v1.operation_id = document_view_fields.operation_id
894                        WHERE
895                            operation_fields_v1.name = '{}'
896                            AND document_view_fields.document_view_id = documents.document_view_id
897                        LIMIT 1
898                    )
899                    "#,
900                    typecast_field_sql("operation_fields_v1.value", field_name, schema, false),
901                    field_name,
902                )
903            }
904        })
905        .map(|field| {
906            let direction = match order.direction {
907                Direction::Ascending => "ASC",
908                Direction::Descending => "DESC",
909            };
910
911            format!("{field} {direction}")
912        });
913
914    // .. and by relation list index, in case we're querying one
915    let list_sql = match (&order.field, list) {
916        (None, Some(_)) => Some("operation_fields_v1_list.list_index ASC".to_string()),
917        _ => None,
918    };
919
920    // Order by document view id, except of if it was selected manually. We need this to assemble
921    // the rows to documents correctly at the end
922    let id_sql = {
923        if fields.len() > 1 {
924            match order.field {
925                Some(Field::Meta(MetaField::DocumentViewId)) => None,
926                _ => Some("documents.document_view_id ASC".to_string()),
927            }
928        } else {
929            // Skip this step when only _one_ field was selected for this query to not mess with
930            // pagination.
931            //
932            // We're relying on the "cursor" being the tie-breaker for duplicates. If we would
933            // still order by document view id, this would become undesirably our tie-breaker
934            // instead - as every field is automatically another unique document when only one was
935            // selected hence we will never order by cursor on top of that.
936            //
937            // When no field was selected we just skip this ordering to simplify the query.
938            None
939        }
940    };
941
942    // On top we sort always by the unique operation cursor in case the previous order value is
943    // equal between two rows
944    let cursor_sql = match list {
945        Some(_) => Some("operation_fields_v1_list.cursor ASC".to_string()),
946        None => Some("operation_fields_v1.cursor ASC".to_string()),
947    };
948
949    let order = concatenate_sql(&[custom, list_sql, id_sql, cursor_sql]);
950
951    format!("ORDER BY {order}")
952}
953
954fn limit_sql<C>(pagination: &Pagination<C>, fields: &ApplicationFields) -> (u64, String)
955where
956    C: Cursor,
957{
958    // We multiply the value by the number of fields we selected. If no fields have been selected
959    // we just take the page size as is
960    let page_size = pagination.first.get() * std::cmp::max(1, fields.len() as u64);
961
962    // ... and add + 1 for the "has next page" flag
963    (page_size, format!("LIMIT {page_size} + 1"))
964}
965
966fn where_fields_sql(fields: &ApplicationFields) -> String {
967    if fields.is_empty() {
968        "".to_string()
969    } else {
970        let fields_sql: Vec<String> = fields.iter().map(|field| format!("'{field}'")).collect();
971        format!(
972            "AND operation_fields_v1.name IN ({})",
973            fields_sql.join(", ")
974        )
975    }
976}
977
978fn select_edited_sql(select: &Select) -> Option<String> {
979    if select.fields.contains(&Field::Meta(MetaField::Edited)) {
980        let sql = r#"
981        -- Check if there is more operations next to the initial "create" operation
982        (
983            SELECT
984                operations_v1.public_key
985            FROM
986                operations_v1
987            WHERE
988                operations_v1.action != "create"
989                AND
990                    operations_v1.document_id = documents.document_id
991            LIMIT 1
992        ) AS is_edited
993        "#
994        .to_string();
995
996        Some(sql)
997    } else {
998        None
999    }
1000}
1001
1002fn select_owner_sql(select: &Select) -> Option<String> {
1003    if select.fields.contains(&Field::Meta(MetaField::Owner)) {
1004        // The original owner of a document we get by checking which public key signed the
1005        // "create" operation, the hash of that operation is the same as the document id
1006        let sql = r#"
1007        (
1008            SELECT
1009                operations_v1.public_key
1010            FROM
1011                operations_v1
1012            WHERE
1013                operations_v1.operation_id = documents.document_id
1014        ) AS owner
1015        "#
1016        .to_string();
1017
1018        Some(sql)
1019    } else {
1020        None
1021    }
1022}
1023
1024fn select_fields_sql(fields: &ApplicationFields) -> Vec<Option<String>> {
1025    let mut select = Vec::new();
1026
1027    if !fields.is_empty() {
1028        // We get the application data by selecting the name, value and type
1029        select.push(Some("operation_fields_v1.name".to_string()));
1030        select.push(Some("operation_fields_v1.value".to_string()));
1031        select.push(Some("operation_fields_v1.field_type".to_string()));
1032    }
1033
1034    select
1035}
1036
1037fn select_cursor_sql(list: Option<&RelationList>) -> Vec<Option<String>> {
1038    match list {
1039        Some(_) => {
1040            vec![
1041                Some("operation_fields_v1.cursor AS cmp_value_cursor".to_string()),
1042                Some("operation_fields_v1_list.cursor AS root_cursor".to_string()),
1043            ]
1044        }
1045        None => vec![Some(
1046            "operation_fields_v1.cursor AS cmp_value_cursor".to_string(),
1047        )],
1048    }
1049}
1050
1051fn where_sql(schema: &Schema, fields: &ApplicationFields, list: Option<&RelationList>) -> String {
1052    let schema_id = schema.id();
1053
1054    // Only one row per field: restrict relation lists to first list item
1055    let list_index_sql = "AND operation_fields_v1.list_index = 0";
1056
1057    // Always select at least one field, even if user didn't select one. If we wouldn't select a
1058    // field we would potentially receive multiple rows per document even though when we're only
1059    // interested in one per row.
1060    let extra_field_select = if fields.is_empty() {
1061        let (field_name, _) = schema.fields().iter().next().unwrap();
1062        format!("AND operation_fields_v1.name = '{field_name}'")
1063    } else {
1064        "".to_string()
1065    };
1066
1067    match list {
1068        None => {
1069            // Filter by the queried schema of that collection
1070            format!(
1071                r#"
1072                documents.schema_id = '{schema_id}'
1073                {list_index_sql}
1074                {extra_field_select}
1075                "#
1076            )
1077        }
1078        Some(relation_list) => {
1079            // Filter by the parent document view id of this relation list
1080            let field_name = &relation_list.field;
1081            let root_view_id = &relation_list.root_view_id;
1082            let field_type = match relation_list.list_type {
1083                RelationListType::Pinned => "pinned_relation_list",
1084                RelationListType::Unpinned => "relation_list",
1085            };
1086
1087            format!(
1088                r#"
1089                document_view_fields_list.document_view_id = '{root_view_id}'
1090                AND
1091                    operation_fields_v1_list.field_type = '{field_type}'
1092                AND
1093                    operation_fields_v1_list.name = '{field_name}'
1094                {list_index_sql}
1095                {extra_field_select}
1096                "#
1097            )
1098        }
1099    }
1100}
1101
1102fn from_sql(list: Option<&RelationList>) -> String {
1103    match list {
1104        Some(relation_list) => {
1105            let filter_sql = match relation_list.list_type {
1106                RelationListType::Pinned => "documents.document_view_id",
1107                RelationListType::Unpinned => "documents.document_id",
1108            };
1109
1110            format!(
1111                r#"
1112                -- Select relation list of parent document first ..
1113                document_view_fields document_view_fields_list
1114                JOIN operation_fields_v1 operation_fields_v1_list
1115                    ON
1116                        document_view_fields_list.operation_id = operation_fields_v1_list.operation_id
1117                    AND
1118                        document_view_fields_list.name = operation_fields_v1_list.name
1119
1120                -- .. and join the related documents afterwards
1121                JOIN documents
1122                    ON
1123                        operation_fields_v1_list.value = {filter_sql}
1124                JOIN document_view_fields
1125                    ON documents.document_view_id = document_view_fields.document_view_id
1126                "#
1127            )
1128        }
1129        // Otherwise just query the documents directly
1130        None => r#"
1131            documents
1132            JOIN document_view_fields
1133                ON documents.document_view_id = document_view_fields.document_view_id
1134        "#
1135        .to_string(),
1136    }
1137}
1138
1139impl SqlStore {
1140    /// Returns a paginated collection of documents from the database which can be filtered and
1141    /// ordered by custom parameters.
1142    ///
1143    /// When passing a `list` configuration the query will run against the documents of a (pinned
1144    /// and unpinned) relation list instead.
1145    pub async fn query(
1146        &self,
1147        schema: &Schema,
1148        args: &Query<PaginationCursor>,
1149        list: Option<&RelationList>,
1150    ) -> Result<QueryResponse, DocumentStorageError> {
1151        // Get all selected application fields from query
1152        let application_fields = args.select.application_fields();
1153
1154        // Generate SQL based on the given schema and query
1155        let mut select_vec = vec![
1156            // We get all the meta informations first, let's start with the document id, document
1157            // view id and id of the operation which holds the data
1158            Some("documents.document_id".to_string()),
1159            Some("documents.document_view_id".to_string()),
1160            Some("document_view_fields.operation_id".to_string()),
1161            // The deletion status of a document we already store in the database, let's select it
1162            // in any case since we get it for free
1163            Some("documents.is_deleted".to_string()),
1164        ];
1165
1166        // .. also the unique cursor is very important, it will help us with cursor-based
1167        // pagination, especially over relation lists with duplicate documents
1168        select_vec.append(&mut select_cursor_sql(list));
1169
1170        // All other fields we optionally select depending on the query
1171        select_vec.push(select_edited_sql(&args.select));
1172        select_vec.push(select_owner_sql(&args.select));
1173        select_vec.append(&mut select_fields_sql(&application_fields));
1174
1175        let select = concatenate_sql(&select_vec);
1176
1177        let from = from_sql(list);
1178
1179        let where_ = where_sql(schema, &application_fields, list);
1180        let and_fields = where_fields_sql(&application_fields);
1181        let (and_filters, mut bind_args) = where_filter_sql(&args.filter, schema);
1182        let and_pagination = where_pagination_sql(
1183            &self.pool,
1184            &mut bind_args,
1185            &args.pagination,
1186            &application_fields,
1187            list,
1188            schema,
1189            &args.order,
1190        )
1191        .await?;
1192
1193        let order = order_sql(&args.order, schema, list, &application_fields);
1194        let (page_size, limit) = limit_sql(&args.pagination, &application_fields);
1195
1196        let sea_quel = format!(
1197            r#"
1198            -- ░░░░░▄▀▀▀▄░░░░░░░░░░░░░░░░░
1199            -- ▄███▀░◐░░░▌░░░░░░░░░░░░░░░░
1200            -- ░░░░▌░░░░░▐░░░░░░░░░░░░░░░░
1201            -- ░░░░▐░░░░░▐░░░░░░░░░░░░░░░░
1202            -- ░░░░▌░░░░░▐▄▄░░░░░░░░░░░░░░
1203            -- ░░░░▌░░░░▄▀▒▒▀▀▀▀▄░░░░░░░░░
1204            -- ░░░▐░░░░▐▒▒▒▒▒▒▒▒▀▀▄░░░░░░░
1205            -- ░░░▐░░░░▐▄▒▒▒▒▒▒▒▒▒▒▀▄░░░░░
1206            -- ░░░░▀▄░░░░▀▄▒▒▒▒▒▒▒▒▒▒▀▄░░░
1207            -- ░░░░░░▀▄▄▄▄▄█▄▄▄▄▄▄▄▄▄▄▄▀▄░
1208            -- ░░░░░░░░░░░▌▌░▌▌░░░░░░░░░░░
1209            -- ░░░░░░░░░░░▌▌░▌▌░░░░░░░░░░░
1210            -- ░░░░░░░░░▄▄▌▌▄▌▌░░░░░░░░░░░
1211            --
1212            -- Hello, my name is sea gull.
1213            -- I designed a SQL query which
1214            -- is as smart, as big and as
1215            -- annoying as myself.
1216
1217            SELECT
1218                {select}
1219
1220            FROM
1221                -- Usually we query the "documents" table first. In case we're looking at a relation
1222                -- list this is slighly more complicated and we need to do some additional JOINs
1223                {from}
1224
1225                -- We need to add some more JOINs to get the values from the operations
1226                JOIN operation_fields_v1
1227                    ON
1228                        document_view_fields.operation_id = operation_fields_v1.operation_id
1229                        AND
1230                            document_view_fields.name = operation_fields_v1.name
1231
1232            WHERE
1233                -- We filter by the queried schema of that collection, if it is a relation
1234                -- list we filter by the view id of the parent document
1235                {where_}
1236
1237                -- .. and select only the operation fields we're interested in
1238                {and_fields}
1239
1240                -- .. and further filter the data by custom parameters
1241                {and_filters}
1242
1243                -- Lastly we batch all results into smaller chunks via cursor pagination
1244                {and_pagination}
1245
1246            -- We always order the rows by document id and list index, but there might also be
1247            -- user-defined ordering on top of that
1248            {order}
1249
1250            -- Connected to cursor pagination we limit the number of rows
1251            {limit}
1252        "#
1253        );
1254
1255        let mut query = query_as::<_, QueryRow>(&sea_quel);
1256
1257        // Bind untrusted user arguments to query
1258        query = bind_to_query(query, &bind_args);
1259
1260        let mut rows: Vec<QueryRow> = query
1261            .fetch_all(&self.pool)
1262            .await
1263            .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
1264
1265        // We always query one more row than needed to find out if there's more data. This
1266        // information aids the user during pagination
1267        let has_next_page = if rows.len() as u64 > page_size {
1268            // Remove that last row from final results if it exists
1269            rows.pop();
1270            true
1271        } else {
1272            false
1273        };
1274
1275        // Calculate the total number of (filtered) documents in this query
1276        let total_count = if args
1277            .pagination
1278            .fields
1279            .contains(&PaginationField::TotalCount)
1280        {
1281            Some(self.count(schema, args, list).await?)
1282        } else {
1283            None
1284        };
1285
1286        // Finally convert everything into the right format
1287        let documents = convert_rows(rows, list, &application_fields, schema.id());
1288
1289        // Determine cursors for pagination by looking at beginning and end of results
1290        let start_cursor = if args
1291            .pagination
1292            .fields
1293            .contains(&PaginationField::StartCursor)
1294        {
1295            documents.first().map(|(cursor, _)| cursor.to_owned())
1296        } else {
1297            None
1298        };
1299
1300        let end_cursor = if args.pagination.fields.contains(&PaginationField::EndCursor) {
1301            documents.last().map(|(cursor, _)| cursor.to_owned())
1302        } else {
1303            None
1304        };
1305
1306        let pagination_data = PaginationData {
1307            total_count,
1308            has_next_page,
1309            // @TODO: Implement backwards pagination, see related issue:
1310            // https://github.com/p2panda/aquadoggo/issues/325
1311            has_previous_page: false,
1312            start_cursor,
1313            end_cursor,
1314        };
1315
1316        Ok((pagination_data, documents))
1317    }
1318
1319    /// Query number of documents in filtered collection.
1320    pub async fn count(
1321        &self,
1322        schema: &Schema,
1323        args: &Query<PaginationCursor>,
1324        list: Option<&RelationList>,
1325    ) -> Result<u64, DocumentStorageError> {
1326        let application_fields = args.select.application_fields();
1327
1328        let from = from_sql(list);
1329        let where_ = where_sql(schema, &application_fields, list);
1330        let (and_filters, bind_args) = where_filter_sql(&args.filter, schema);
1331
1332        let count_sql = format!(
1333            r#"
1334            SELECT
1335                COUNT(documents.document_id)
1336
1337            FROM
1338                {from}
1339
1340                JOIN operation_fields_v1
1341                    ON
1342                        document_view_fields.operation_id = operation_fields_v1.operation_id
1343                        AND
1344                            document_view_fields.name = operation_fields_v1.name
1345
1346            WHERE
1347                {where_}
1348                {and_filters}
1349
1350            -- Group application fields by name to make sure we get actual number of documents
1351            GROUP BY operation_fields_v1.name
1352            "#
1353        );
1354
1355        let mut query = query_as::<_, (i64,)>(&count_sql);
1356
1357        // Bind untrusted user arguments to query
1358        query = bind_to_query(query, &bind_args);
1359
1360        let result: Option<(i64,)> = query
1361            .fetch_optional(&self.pool)
1362            .await
1363            .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
1364
1365        let count = match result {
1366            Some(result) => result.0 as u64,
1367            None => 0,
1368        };
1369
1370        Ok(count)
1371    }
1372}
1373
1374/// Merges all operation fields from the database into documents.
1375///
1376/// Due to the special table layout we receive one row per operation field in the query. Usually we
1377/// need to merge multiple rows / operation fields into one document.
1378///
1379/// This method also returns a cursor for each document to the clients which can then use it to
1380/// control pagination. Since every cursor is unique for each operation field and there might be
1381/// multiple cursors for one document we need to make sure to _always_ pick the _last_ cursor for
1382/// each document.
1383///
1384/// ```text
1385/// -------------------------------------------------
1386/// document_id | view_id | field_name | ... | cursor
1387/// -------------------------------------------------
1388/// 0x01        | 0x01    | username   | ... | 0xa0
1389/// 0x01        | 0x01    | message    | ... | 0xc2   <--- Last Cursor
1390/// 0x02        | 0x02    | username   | ... | 0x06
1391/// 0x02        | 0x02    | message    | ... | 0x8b   <--- Last Cursor
1392/// 0x04        | 0x04    | username   | ... | 0x06
1393/// 0x04        | 0x04    | message    | ... | 0x8b   <--- Last Cursor
1394/// -------------------------------------------------
1395/// ```
1396///
1397fn convert_rows(
1398    rows: Vec<QueryRow>,
1399    list: Option<&RelationList>,
1400    fields: &ApplicationFields,
1401    schema_id: &SchemaId,
1402) -> Vec<(PaginationCursor, StorageDocument)> {
1403    let mut converted: Vec<(PaginationCursor, StorageDocument)> = Vec::new();
1404
1405    if rows.is_empty() {
1406        return converted;
1407    }
1408
1409    // Helper method to convert database row into final document and cursor type
1410    let finalize_document = |row: &QueryRow,
1411                             collected_fields: Vec<DocumentViewFieldRow>,
1412                             collected_rows: &HashMap<FieldName, QueryRow>|
1413     -> (PaginationCursor, StorageDocument) {
1414        // Determine cursor for this document by looking at the last field
1415        let cursor = {
1416            let last_field = collected_fields
1417                .last()
1418                .expect("Needs to be at least one field");
1419
1420            row_to_cursor(
1421                collected_rows
1422                    .get(&last_field.name)
1423                    .expect("Field selected for ordering needs to be inside of document"),
1424                list,
1425            )
1426        };
1427
1428        // Convert all gathered data into final `StorageDocument` format
1429        let fields = parse_document_view_field_rows(collected_fields);
1430
1431        let document = StorageDocument {
1432            id: row.document_id.parse().unwrap(),
1433            fields: Some(fields),
1434            schema_id: schema_id.clone(),
1435            view_id: row.document_view_id.parse().unwrap(),
1436            author: (&row.owner).into(),
1437            deleted: row.is_deleted,
1438        };
1439
1440        (cursor, document)
1441    };
1442
1443    let rows_per_document = std::cmp::max(fields.len(), 1);
1444
1445    let mut current = rows[0].clone();
1446    let mut current_fields = Vec::with_capacity(rows_per_document);
1447    let mut current_rows = HashMap::new();
1448
1449    for (index, row) in rows.into_iter().enumerate() {
1450        // We observed a new document coming up in the next row, time to change
1451        if index % rows_per_document == 0 && index > 0 {
1452            // Finalize the current document, convert it and push it into the final array
1453            let (cursor, document) = finalize_document(&current, current_fields, &current_rows);
1454            converted.push((cursor, document));
1455
1456            // Change the pointer to the next document
1457            current = row.clone();
1458            current_fields = Vec::with_capacity(rows_per_document);
1459        }
1460
1461        // Collect original rows from SQL query
1462        current_rows.insert(row.name.clone(), row.clone());
1463
1464        // Collect every field of the document to assemble it later into a `StorageDocument`
1465        current_fields.push(DocumentViewFieldRow {
1466            document_id: row.document_id,
1467            document_view_id: row.document_view_id,
1468            operation_id: row.operation_id,
1469            name: row.name,
1470            list_index: 0,
1471            field_type: row.field_type,
1472            value: row.value,
1473        });
1474    }
1475
1476    // Do it one last time at the end for the last document
1477    let (cursor, document) = finalize_document(&current, current_fields, &current_rows);
1478    converted.push((cursor, document));
1479
1480    converted
1481}
1482
1483/// Get a cursor from a document row.
1484fn row_to_cursor(row: &QueryRow, list: Option<&RelationList>) -> PaginationCursor {
1485    match list {
1486        Some(relation_list) => {
1487            // If we're querying a relation list then we mention the document view id
1488            // of the parent document. This helps us later to understand _which_ of the
1489            // potentially many relation lists we want to paginate
1490            PaginationCursor::new(
1491                row.cmp_value_cursor.as_str().into(),
1492                Some(row.root_cursor.as_str().into()),
1493                Some(relation_list.root_view_id.clone()),
1494            )
1495        }
1496        None => PaginationCursor::new(row.cmp_value_cursor.as_str().into(), None, None),
1497    }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502    use std::num::NonZeroU64;
1503
1504    use p2panda_rs::document::traits::AsDocument;
1505    use p2panda_rs::document::DocumentViewId;
1506    use p2panda_rs::hash::Hash;
1507    use p2panda_rs::identity::KeyPair;
1508    use p2panda_rs::operation::{OperationValue, PinnedRelationList};
1509    use p2panda_rs::schema::{FieldType, Schema, SchemaId};
1510    use p2panda_rs::storage_provider::traits::DocumentStore;
1511    use p2panda_rs::test_utils::fixtures::{key_pair, schema_id};
1512    use rstest::rstest;
1513
1514    use crate::db::models::{OptionalOwner, QueryRow};
1515    use crate::db::query::{
1516        Direction, Field, Filter, MetaField, Order, Pagination, PaginationField, Select,
1517    };
1518    use crate::db::stores::{OperationCursor, RelationList};
1519    use crate::db::types::StorageDocument;
1520    use crate::test_utils::{
1521        add_document, add_schema, add_schema_and_documents, doggo_fields, doggo_schema,
1522        populate_and_materialize, populate_store_config, test_runner, PopulateStoreConfig,
1523        TestNode,
1524    };
1525
1526    use super::{convert_rows, PaginationCursor, Query};
1527
1528    fn get_document_value(document: &StorageDocument, field: &str) -> OperationValue {
1529        document
1530            .fields()
1531            .expect("Expected document fields")
1532            .get(field)
1533            .unwrap_or_else(|| panic!("{}", "Expected '{field}' field to exist in document"))
1534            .value()
1535            .to_owned()
1536    }
1537
1538    async fn create_events_test_data(
1539        node: &mut TestNode,
1540        key_pair: &KeyPair,
1541    ) -> (Schema, Vec<DocumentViewId>) {
1542        add_schema_and_documents(
1543            node,
1544            "events",
1545            vec![
1546                vec![
1547                    (
1548                        "title",
1549                        "Kids Bits! Chiptune for baby squirrels".into(),
1550                        None,
1551                    ),
1552                    ("date", "2023-04-17".into(), None),
1553                    ("ticket_price", 5.75.into(), None),
1554                ],
1555                vec![
1556                    ("title", "The Pandadoodle Flute Trio".into(), None),
1557                    ("date", "2023-04-14".into(), None),
1558                    ("ticket_price", 12.5.into(), None),
1559                ],
1560                vec![
1561                    ("title", "Eventual Consistent Grapefruit".into(), None),
1562                    ("date", "2023-05-02".into(), None),
1563                    ("ticket_price", 24.99.into(), None),
1564                ],
1565                vec![
1566                    (
1567                        "title",
1568                        "Bamboo-Scrumble Rumba Night - Xmas special".into(),
1569                        None,
1570                    ),
1571                    ("date", "2023-12-20".into(), None),
1572                    ("ticket_price", 99.0.into(), None),
1573                ],
1574                vec![
1575                    ("title", "Shoebill - Non-migratory Shoegaze".into(), None),
1576                    ("date", "2023-09-09".into(), None),
1577                    ("ticket_price", 10.00.into(), None),
1578                ],
1579            ],
1580            key_pair,
1581        )
1582        .await
1583    }
1584
1585    async fn create_venues_test_data(
1586        node: &mut TestNode,
1587        key_pair: &KeyPair,
1588    ) -> (Schema, Vec<DocumentViewId>) {
1589        add_schema_and_documents(
1590            node,
1591            "venues",
1592            vec![
1593                vec![("name", "World Wide Feld".into(), None)],
1594                vec![("name", "Internet Explorer".into(), None)],
1595                vec![("name", "p4p space".into(), None)],
1596            ],
1597            key_pair,
1598        )
1599        .await
1600    }
1601
1602    async fn create_visited_test_data(
1603        node: &mut TestNode,
1604        venues_view_ids: Vec<DocumentViewId>,
1605        venues_schema: Schema,
1606        key_pair: &KeyPair,
1607    ) -> (Schema, Vec<DocumentViewId>) {
1608        add_schema_and_documents(
1609            node,
1610            "visited",
1611            vec![
1612                vec![
1613                    ("user", "seagull".into(), None),
1614                    (
1615                        "venues",
1616                        vec![
1617                            venues_view_ids[0].clone(),
1618                            venues_view_ids[0].clone(),
1619                            venues_view_ids[1].clone(),
1620                            venues_view_ids[2].clone(),
1621                            venues_view_ids[0].clone(),
1622                            venues_view_ids[0].clone(),
1623                            venues_view_ids[1].clone(),
1624                        ]
1625                        .into(),
1626                        Some(venues_schema.id().to_owned()),
1627                    ),
1628                ],
1629                vec![
1630                    ("user", "panda".into(), None),
1631                    (
1632                        "venues",
1633                        vec![
1634                            venues_view_ids[1].clone(),
1635                            venues_view_ids[1].clone(),
1636                            venues_view_ids[2].clone(),
1637                        ]
1638                        .into(),
1639                        Some(venues_schema.id().to_owned()),
1640                    ),
1641                ],
1642            ],
1643            key_pair,
1644        )
1645        .await
1646    }
1647
1648    async fn create_chat_test_data(
1649        node: &mut TestNode,
1650        key_pair: &KeyPair,
1651    ) -> (Schema, Vec<DocumentViewId>) {
1652        add_schema_and_documents(
1653            node,
1654            "chat",
1655            vec![
1656                vec![
1657                    ("message", "Hello, Panda!".into(), None),
1658                    ("username", "penguin".into(), None),
1659                    ("timestamp", 1687265969.into(), None),
1660                ],
1661                vec![
1662                    ("message", "Oh, howdy, Pengi!".into(), None),
1663                    ("username", "panda".into(), None),
1664                    ("timestamp", 1687266014.into(), None),
1665                ],
1666                vec![
1667                    ("message", "How are you?".into(), None),
1668                    ("username", "panda".into(), None),
1669                    ("timestamp", 1687266032.into(), None),
1670                ],
1671                vec![
1672                    ("message", "I miss Pengolina. How about you?".into(), None),
1673                    ("username", "penguin".into(), None),
1674                    ("timestamp", 1687266055.into(), None),
1675                ],
1676                vec![
1677                    ("message", "I am cute and very hungry".into(), None),
1678                    ("username", "panda".into(), None),
1679                    ("timestamp", 1687266141.into(), None),
1680                ],
1681                vec![
1682                    ("message", "(°◇°) !!".into(), None),
1683                    ("username", "penguin".into(), None),
1684                    ("timestamp", 1687266160.into(), None),
1685                ],
1686            ],
1687            key_pair,
1688        )
1689        .await
1690    }
1691
1692    #[rstest]
1693    #[case::order_by_date_asc(
1694        Query::new(
1695            &Pagination::default(),
1696            &Select::new(&["title".into()]),
1697            &Filter::new(),
1698            &Order::new(&"date".into(), &Direction::Ascending),
1699        ),
1700        "title".into(),
1701        vec![
1702            "The Pandadoodle Flute Trio".into(),
1703            "Kids Bits! Chiptune for baby squirrels".into(),
1704            "Eventual Consistent Grapefruit".into(),
1705            "Shoebill - Non-migratory Shoegaze".into(),
1706            "Bamboo-Scrumble Rumba Night - Xmas special".into(),
1707        ],
1708    )]
1709    #[case::order_by_date_desc(
1710        Query::new(
1711            &Pagination::default(),
1712            &Select::new(&["date".into()]),
1713            &Filter::new(),
1714            &Order::new(&"date".into(), &Direction::Descending),
1715        ),
1716        "date".into(),
1717        vec![
1718            "2023-12-20".into(),
1719            "2023-09-09".into(),
1720            "2023-05-02".into(),
1721            "2023-04-17".into(),
1722            "2023-04-14".into(),
1723        ],
1724    )]
1725    #[case::filter_by_ticket_price_range(
1726        Query::new(
1727            &Pagination::default(),
1728            &Select::new(&["ticket_price".into()]),
1729            &Filter::new().fields(&[
1730                ("ticket_price_gt", &[10.0.into()]),
1731                ("ticket_price_lt", &[50.0.into()]),
1732            ]),
1733            &Order::new(&"ticket_price".into(), &Direction::Ascending),
1734        ),
1735        "ticket_price".into(),
1736        vec![
1737            12.5.into(),
1738            24.99.into(),
1739        ],
1740    )]
1741    #[case::filter_by_search_string(
1742        Query::new(
1743            &Pagination::default(),
1744            &Select::new(&["title".into()]),
1745            &Filter::new().fields(&[
1746                ("title_contains", &["baby".into()]),
1747            ]),
1748            &Order::default(),
1749        ),
1750        "title".into(),
1751        vec![
1752            "Kids Bits! Chiptune for baby squirrels".into(),
1753        ],
1754    )]
1755    fn basic_queries(
1756        key_pair: KeyPair,
1757        #[case] args: Query<PaginationCursor>,
1758        #[case] selected_field: String,
1759        #[case] expected_fields: Vec<OperationValue>,
1760    ) {
1761        test_runner(|mut node: TestNode| async move {
1762            let (schema, _) = create_events_test_data(&mut node, &key_pair).await;
1763
1764            let (_, documents) = node
1765                .context
1766                .store
1767                .query(&schema, &args, None)
1768                .await
1769                .expect("Query failed");
1770
1771            assert_eq!(documents.len(), expected_fields.len());
1772
1773            // Compare expected field values over all returned documents
1774            for (index, expected_value) in expected_fields.into_iter().enumerate() {
1775                assert_eq!(
1776                    get_document_value(&documents[index].1, &selected_field),
1777                    expected_value
1778                );
1779            }
1780        });
1781    }
1782
1783    #[rstest]
1784    #[case::order_by_timestamp(
1785        Order::new(&"timestamp".into(), &Direction::Ascending),
1786        "message".into(),
1787        vec![
1788            "Hello, Panda!".into(),
1789            "Oh, howdy, Pengi!".into(),
1790            "How are you?".into(),
1791            "I miss Pengolina. How about you?".into(),
1792            "I am cute and very hungry".into(),
1793            "(°◇°) !!".into(),
1794        ],
1795    )]
1796    #[case::order_by_timestamp_descending(
1797        Order::new(&"timestamp".into(), &Direction::Descending),
1798        "message".into(),
1799        vec![
1800            "(°◇°) !!".into(),
1801            "I am cute and very hungry".into(),
1802            "I miss Pengolina. How about you?".into(),
1803            "How are you?".into(),
1804            "Oh, howdy, Pengi!".into(),
1805            "Hello, Panda!".into(),
1806        ],
1807    )]
1808    #[case::order_by_message(
1809        Order::new(&"message".into(), &Direction::Ascending),
1810        "message".into(),
1811        vec![
1812            "(°◇°) !!".into(),
1813            "Hello, Panda!".into(),
1814            "How are you?".into(),
1815            "I am cute and very hungry".into(),
1816            "I miss Pengolina. How about you?".into(),
1817            "Oh, howdy, Pengi!".into(),
1818        ],
1819    )]
1820    fn pagination_over_ordered_fields(
1821        key_pair: KeyPair,
1822        #[case] order: Order,
1823        #[case] selected_field: String,
1824        #[case] expected_fields: Vec<OperationValue>,
1825    ) {
1826        test_runner(|mut node: TestNode| async move {
1827            let (schema, view_ids) = create_chat_test_data(&mut node, &key_pair).await;
1828
1829            let mut cursor: Option<PaginationCursor> = None;
1830
1831            let mut args = Query::new(
1832                &Pagination::new(
1833                    &NonZeroU64::new(1).unwrap(),
1834                    cursor.as_ref(),
1835                    &vec![
1836                        PaginationField::TotalCount,
1837                        PaginationField::EndCursor,
1838                        PaginationField::HasNextPage,
1839                    ],
1840                ),
1841                &Select::new(&[
1842                    Field::Meta(MetaField::DocumentViewId),
1843                    Field::Field("message".into()),
1844                    Field::Field("username".into()),
1845                    Field::Field("timestamp".into()),
1846                ]),
1847                &Filter::default(),
1848                &order,
1849            );
1850
1851            // Go through all pages, one document at a time
1852            for (index, expected_field) in expected_fields.into_iter().enumerate() {
1853                args.pagination.after = cursor;
1854
1855                let (pagination_data, documents) = node
1856                    .context
1857                    .store
1858                    .query(&schema, &args, None)
1859                    .await
1860                    .expect("Query failed");
1861
1862                match pagination_data.end_cursor {
1863                    Some(end_cursor) => {
1864                        cursor = Some(end_cursor);
1865                    }
1866                    None => panic!("Expected cursor"),
1867                }
1868
1869                if view_ids.len() - 1 == index {
1870                    assert!(!pagination_data.has_next_page);
1871                } else {
1872                    assert!(pagination_data.has_next_page);
1873                }
1874
1875                assert_eq!(pagination_data.total_count, Some(view_ids.len() as u64));
1876                assert_eq!(documents.len(), 1);
1877                assert_eq!(documents[0].1.get(&selected_field), Some(&expected_field));
1878                assert_eq!(cursor.as_ref(), Some(&documents[0].0));
1879            }
1880
1881            // Query one last time after we paginated through everything
1882            args.pagination.after = cursor;
1883
1884            let (pagination_data, documents) = node
1885                .context
1886                .store
1887                .query(&schema, &args, None)
1888                .await
1889                .expect("Query failed");
1890
1891            assert_eq!(pagination_data.total_count, Some(view_ids.len() as u64));
1892            assert_eq!(pagination_data.end_cursor, None);
1893            assert!(!pagination_data.has_next_page);
1894            assert_eq!(documents.len(), 0);
1895        });
1896    }
1897
1898    #[rstest]
1899    fn pagination_over_ordered_view_ids(key_pair: KeyPair) {
1900        test_runner(|mut node: TestNode| async move {
1901            let (schema, mut view_ids) = create_events_test_data(&mut node, &key_pair).await;
1902            let view_ids_len = view_ids.len();
1903
1904            // Sort created documents by document view id, to compare to similarily sorted query
1905            // results
1906            view_ids.sort();
1907
1908            let mut cursor: Option<PaginationCursor> = None;
1909
1910            let mut args = Query::new(
1911                &Pagination::new(
1912                    &NonZeroU64::new(1).unwrap(),
1913                    cursor.as_ref(),
1914                    &vec![
1915                        PaginationField::TotalCount,
1916                        PaginationField::EndCursor,
1917                        PaginationField::HasNextPage,
1918                    ],
1919                ),
1920                &Select::new(&[Field::Meta(MetaField::DocumentViewId)]),
1921                &Filter::default(),
1922                &Order::new(
1923                    &Field::Meta(MetaField::DocumentViewId),
1924                    &Direction::Ascending,
1925                ),
1926            );
1927
1928            // Go through all pages, one document at a time
1929            for (index, view_id) in view_ids.into_iter().enumerate() {
1930                args.pagination.after = cursor;
1931
1932                let (pagination_data, documents) = node
1933                    .context
1934                    .store
1935                    .query(&schema, &args, None)
1936                    .await
1937                    .expect("Query failed");
1938
1939                match pagination_data.end_cursor {
1940                    Some(end_cursor) => {
1941                        cursor = Some(end_cursor);
1942                    }
1943                    None => panic!("Expected cursor"),
1944                }
1945
1946                if view_ids_len - 1 == index {
1947                    assert!(!pagination_data.has_next_page);
1948                } else {
1949                    assert!(pagination_data.has_next_page);
1950                }
1951
1952                assert_eq!(pagination_data.total_count, Some(5));
1953                assert_eq!(documents.len(), 1);
1954                assert_eq!(documents[0].1.view_id, view_id);
1955                assert_eq!(cursor.as_ref(), Some(&documents[0].0));
1956            }
1957
1958            // Query one last time after we paginated through everything
1959            args.pagination.after = cursor;
1960
1961            let (pagination_data, documents) = node
1962                .context
1963                .store
1964                .query(&schema, &args, None)
1965                .await
1966                .expect("Query failed");
1967
1968            assert_eq!(pagination_data.total_count, Some(5));
1969            assert_eq!(pagination_data.end_cursor, None);
1970            assert!(!pagination_data.has_next_page);
1971            assert_eq!(documents.len(), 0);
1972        });
1973    }
1974
1975    #[rstest]
1976    fn pinned_relation_list(key_pair: KeyPair) {
1977        test_runner(|mut node: TestNode| async move {
1978            let (venues_schema, venues_view_ids) =
1979                create_venues_test_data(&mut node, &key_pair).await;
1980
1981            let (_, visited_view_ids) = create_visited_test_data(
1982                &mut node,
1983                venues_view_ids,
1984                venues_schema.to_owned(),
1985                &key_pair,
1986            )
1987            .await;
1988
1989            let args = Query::new(
1990                &Pagination::new(
1991                    &NonZeroU64::new(10).unwrap(),
1992                    None,
1993                    &vec![
1994                        PaginationField::TotalCount,
1995                        PaginationField::EndCursor,
1996                        PaginationField::HasNextPage,
1997                    ],
1998                ),
1999                &Select::new(&["name".into()]),
2000                &Filter::default(),
2001                &Order::default(),
2002            );
2003
2004            // Select the pinned relation list "venues" of the first visited document
2005            let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2006
2007            let (pagination_data, documents) = node
2008                .context
2009                .store
2010                .query(&venues_schema, &args, Some(&list))
2011                .await
2012                .expect("Query failed");
2013
2014            assert_eq!(documents.len(), 7);
2015            assert_eq!(pagination_data.total_count, Some(7));
2016            assert_eq!(
2017                get_document_value(&documents[0].1, "name"),
2018                OperationValue::String("World Wide Feld".to_string())
2019            );
2020            assert_eq!(
2021                get_document_value(&documents[4].1, "name"),
2022                OperationValue::String("World Wide Feld".to_string())
2023            );
2024
2025            // Select the pinned relation list "venues" of the second visited document
2026            let list = RelationList::new_pinned(&visited_view_ids[1], "venues");
2027
2028            let (pagination_data, documents) = node
2029                .context
2030                .store
2031                .query(&venues_schema, &args, Some(&list))
2032                .await
2033                .expect("Query failed");
2034
2035            assert_eq!(documents.len(), 3);
2036            assert_eq!(pagination_data.total_count, Some(3));
2037            assert_eq!(
2038                get_document_value(&documents[0].1, "name"),
2039                OperationValue::String("Internet Explorer".to_string())
2040            );
2041            assert_eq!(
2042                get_document_value(&documents[1].1, "name"),
2043                OperationValue::String("Internet Explorer".to_string())
2044            );
2045            assert_eq!(
2046                get_document_value(&documents[2].1, "name"),
2047                OperationValue::String("p4p space".to_string())
2048            );
2049        });
2050    }
2051
2052    #[rstest]
2053    fn empty_pinned_relation_list(key_pair: KeyPair) {
2054        test_runner(|mut node: TestNode| async move {
2055            let (venues_schema, _) = create_venues_test_data(&mut node, &key_pair).await;
2056
2057            let visited_schema = add_schema(
2058                &mut node,
2059                "visited",
2060                vec![(
2061                    "venues",
2062                    FieldType::PinnedRelationList(venues_schema.id().clone()),
2063                )],
2064                &key_pair,
2065            )
2066            .await;
2067
2068            let visited_view_id = add_document(
2069                &mut node,
2070                visited_schema.id(),
2071                vec![(
2072                    "venues",
2073                    OperationValue::PinnedRelationList(PinnedRelationList::new(vec![])),
2074                )],
2075                &key_pair,
2076            )
2077            .await;
2078
2079            // Query selecting only meta field.
2080            let args = Query::new(
2081                &Pagination::new(
2082                    &NonZeroU64::new(10).unwrap(),
2083                    None,
2084                    &vec![
2085                        PaginationField::TotalCount,
2086                        PaginationField::EndCursor,
2087                        PaginationField::HasNextPage,
2088                    ],
2089                ),
2090                &Select::new(&[Field::Meta(MetaField::DocumentId)]),
2091                &Filter::default(),
2092                &Order::default(),
2093            );
2094
2095            // Select the pinned relation list "venues" for the visited document
2096            let list = RelationList::new_pinned(&visited_view_id, "venues");
2097
2098            let (_, documents) = node
2099                .context
2100                .store
2101                .query(&venues_schema, &args, Some(&list))
2102                .await
2103                .expect("Query failed");
2104
2105            assert!(documents.is_empty());
2106
2107            // Query selecting application field.
2108            let args = Query::new(
2109                &Pagination::new(
2110                    &NonZeroU64::new(10).unwrap(),
2111                    None,
2112                    &vec![
2113                        PaginationField::TotalCount,
2114                        PaginationField::EndCursor,
2115                        PaginationField::HasNextPage,
2116                    ],
2117                ),
2118                &Select::new(&["name".into()]),
2119                &Filter::default(),
2120                &Order::default(),
2121            );
2122
2123            // Select the pinned relation list "venues" for the visited document
2124            let list = RelationList::new_pinned(&visited_view_id, "venues");
2125
2126            let (_, documents) = node
2127                .context
2128                .store
2129                .query(&venues_schema, &args, Some(&list))
2130                .await
2131                .expect("Query failed");
2132
2133            assert!(documents.is_empty());
2134
2135            // Query selecting application field
2136            let args = Query::new(
2137                &Pagination::new(
2138                    &NonZeroU64::new(10).unwrap(),
2139                    None,
2140                    &vec![
2141                        PaginationField::TotalCount,
2142                        PaginationField::EndCursor,
2143                        PaginationField::HasNextPage,
2144                    ],
2145                ),
2146                &Select::new(&["venues".into()]),
2147                &Filter::default(),
2148                &Order::default(),
2149            );
2150
2151            node.context
2152                .store
2153                .query(&visited_schema, &args, None)
2154                .await
2155                .expect("Query failed");
2156        });
2157    }
2158
2159    #[rstest]
2160    fn relation_list_pagination_over_ordered_view_ids(key_pair: KeyPair) {
2161        test_runner(|mut node: TestNode| async move {
2162            let (venues_schema, venues_view_ids) =
2163                create_venues_test_data(&mut node, &key_pair).await;
2164
2165            let (_, visited_view_ids) = create_visited_test_data(
2166                &mut node,
2167                venues_view_ids.clone(),
2168                venues_schema.to_owned(),
2169                &key_pair,
2170            )
2171            .await;
2172
2173            let mut view_ids = [
2174                venues_view_ids[0].clone(),
2175                venues_view_ids[0].clone(),
2176                venues_view_ids[1].clone(),
2177                venues_view_ids[2].clone(),
2178                venues_view_ids[0].clone(),
2179                venues_view_ids[0].clone(),
2180                venues_view_ids[1].clone(),
2181            ];
2182            let view_ids_len = view_ids.len();
2183
2184            // Sort created documents by document view id, to compare to similarily sorted query
2185            // results
2186            view_ids.sort();
2187
2188            let mut cursor: Option<PaginationCursor> = None;
2189
2190            // Select the pinned relation list "venues" of the second visited document
2191            let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2192
2193            let mut args = Query::new(
2194                &Pagination::new(
2195                    &NonZeroU64::new(1).unwrap(),
2196                    cursor.as_ref(),
2197                    &vec![
2198                        PaginationField::TotalCount,
2199                        PaginationField::EndCursor,
2200                        PaginationField::HasNextPage,
2201                    ],
2202                ),
2203                &Select::new(&[Field::Meta(MetaField::DocumentViewId)]),
2204                &Filter::default(),
2205                &Order::new(
2206                    &Field::Meta(MetaField::DocumentViewId),
2207                    &Direction::Ascending,
2208                ),
2209            );
2210
2211            // Go through all pages, one document at a time
2212            for (index, view_id) in view_ids.iter().enumerate() {
2213                args.pagination.after = cursor;
2214
2215                let (pagination_data, documents) = node
2216                    .context
2217                    .store
2218                    .query(&venues_schema, &args, Some(&list))
2219                    .await
2220                    .expect("Query failed");
2221
2222                match pagination_data.end_cursor {
2223                    Some(end_cursor) => {
2224                        cursor = Some(end_cursor);
2225                    }
2226                    None => panic!("Expected cursor"),
2227                }
2228
2229                if view_ids_len - 1 == index {
2230                    assert!(!pagination_data.has_next_page);
2231                } else {
2232                    assert!(pagination_data.has_next_page);
2233                }
2234
2235                assert_eq!(pagination_data.total_count, Some(7));
2236                assert_eq!(documents.len(), 1);
2237                assert_eq!(&documents[0].1.view_id, view_id);
2238                assert_eq!(cursor.as_ref(), Some(&documents[0].0));
2239            }
2240
2241            // Query one last time after we paginated through everything
2242            args.pagination.after = cursor;
2243
2244            let (pagination_data, documents) = node
2245                .context
2246                .store
2247                .query(&venues_schema, &args, Some(&list))
2248                .await
2249                .expect("Query failed");
2250
2251            assert_eq!(pagination_data.total_count, Some(7));
2252            assert_eq!(pagination_data.end_cursor, None);
2253            assert!(!pagination_data.has_next_page);
2254            assert_eq!(documents.len(), 0);
2255        });
2256    }
2257
2258    #[rstest]
2259    #[case::default(
2260        Filter::default(),
2261        Order::default(),
2262        vec![
2263            "World Wide Feld".to_string(),
2264            "World Wide Feld".to_string(),
2265            "Internet Explorer".to_string(),
2266            "p4p space".to_string(),
2267            "World Wide Feld".to_string(),
2268            "World Wide Feld".to_string(),
2269            "Internet Explorer".to_string(),
2270        ]
2271    )]
2272    #[case::order_by_name(
2273        Filter::default(),
2274        Order::new(&"name".into(), &Direction::Ascending),
2275        vec![
2276            "Internet Explorer".to_string(),
2277            "Internet Explorer".to_string(),
2278            "p4p space".to_string(),
2279            "World Wide Feld".to_string(),
2280            "World Wide Feld".to_string(),
2281            "World Wide Feld".to_string(),
2282            "World Wide Feld".to_string(),
2283        ]
2284    )]
2285    #[case::search_for_text(
2286        Filter::new().fields(&[("name_contains", &["p".into()])]),
2287        Order::default(),
2288        vec![
2289            "Internet Explorer".to_string(),
2290            "p4p space".to_string(),
2291            "Internet Explorer".to_string(),
2292        ]
2293    )]
2294    #[case::filter_and_order_by_name(
2295        Filter::new().fields(&[("name_in", &["World Wide Feld".into(), "Internet Explorer".into()])]),
2296        Order::new(&"name".into(), &Direction::Descending),
2297        vec![
2298            "World Wide Feld".to_string(),
2299            "World Wide Feld".to_string(),
2300            "World Wide Feld".to_string(),
2301            "World Wide Feld".to_string(),
2302            "Internet Explorer".to_string(),
2303            "Internet Explorer".to_string(),
2304        ]
2305    )]
2306    fn paginated_pinned_relation_list(
2307        key_pair: KeyPair,
2308        #[case] filter: Filter,
2309        #[case] order: Order,
2310        #[case] expected_venues: Vec<String>,
2311    ) {
2312        test_runner(|mut node: TestNode| async move {
2313            let (venues_schema, venues_view_ids) =
2314                create_venues_test_data(&mut node, &key_pair).await;
2315
2316            let (_, visited_view_ids) = create_visited_test_data(
2317                &mut node,
2318                venues_view_ids.clone(),
2319                venues_schema.clone(),
2320                &key_pair,
2321            )
2322            .await;
2323
2324            let documents_len = expected_venues.len() as u64;
2325
2326            let mut cursor: Option<PaginationCursor> = None;
2327
2328            // Select the pinned relation list "venues" of the first visited document
2329            let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2330
2331            let mut args = Query::new(
2332                &Pagination::new(
2333                    &NonZeroU64::new(1).unwrap(),
2334                    cursor.as_ref(),
2335                    &vec![
2336                        PaginationField::TotalCount,
2337                        PaginationField::EndCursor,
2338                        PaginationField::HasNextPage,
2339                    ],
2340                ),
2341                &Select::new(&[
2342                    Field::Meta(MetaField::DocumentViewId),
2343                    Field::Meta(MetaField::Owner),
2344                    "name".into(),
2345                ]),
2346                &filter,
2347                &order,
2348            );
2349
2350            // Go through all pages, one document at a time
2351            for (index, expected_venue) in expected_venues.into_iter().enumerate() {
2352                args.pagination.after = cursor;
2353
2354                let (pagination_data, documents) = node
2355                    .context
2356                    .store
2357                    .query(&venues_schema, &args, Some(&list))
2358                    .await
2359                    .expect("Query failed");
2360
2361                // Check if next cursor exists and prepare it for next iteration
2362                match pagination_data.end_cursor {
2363                    Some(end_cursor) => {
2364                        cursor = Some(end_cursor);
2365                    }
2366                    None => panic!("Expected cursor"),
2367                }
2368
2369                // Check if `has_next_page` flag is correct
2370                if documents_len - 1 == index as u64 {
2371                    assert!(!pagination_data.has_next_page);
2372                } else {
2373                    assert!(pagination_data.has_next_page);
2374                }
2375
2376                // Check if pagination info is correct
2377                assert_eq!(pagination_data.total_count, Some(documents_len));
2378                assert_eq!(cursor.as_ref(), Some(&documents[0].0));
2379
2380                // Check if resulting document is correct
2381                assert_eq!(documents[0].1.author(), &key_pair.public_key());
2382                assert_eq!(documents.len(), 1);
2383                assert_eq!(
2384                    get_document_value(&documents[0].1, "name"),
2385                    expected_venue.into()
2386                );
2387            }
2388
2389            // Go to final, empty page
2390            args.pagination.after = cursor;
2391
2392            let (pagination_data, documents) = node
2393                .context
2394                .store
2395                .query(&venues_schema, &args, Some(&list))
2396                .await
2397                .expect("Query failed");
2398
2399            assert!(!pagination_data.has_next_page);
2400            assert_eq!(pagination_data.total_count, Some(documents_len));
2401            assert_eq!(pagination_data.end_cursor, None);
2402            assert_eq!(documents.len(), 0);
2403        });
2404    }
2405
2406    #[rstest]
2407    #[case::default(Filter::default(), 3)]
2408    #[case::filtered(Filter::new().fields(&[("name_contains", &["Internet".into()])]), 1)]
2409    #[case::no_results(Filter::new().fields(&[("name", &["doesnotexist".into()])]), 0)]
2410    fn count(#[case] filter: Filter, #[case] expected_result: u64, key_pair: KeyPair) {
2411        test_runner(move |mut node: TestNode| async move {
2412            let (venues_schema, _) = create_venues_test_data(&mut node, &key_pair).await;
2413
2414            let args = Query::new(
2415                &Pagination::default(),
2416                &Select::default(),
2417                &filter,
2418                &Order::default(),
2419            );
2420
2421            let result = node
2422                .context
2423                .store
2424                .count(&venues_schema, &args, None)
2425                .await
2426                .unwrap();
2427
2428            assert_eq!(result, expected_result);
2429        });
2430    }
2431
2432    #[rstest]
2433    #[case::default(Filter::default(), 7)]
2434    #[case::filtered_1(Filter::new().fields(&[("name_contains", &["Internet".into()])]), 2)]
2435    #[case::filtered_2(Filter::new().fields(&[("name_not", &["World Wide Feld".into()])]), 3)]
2436    #[case::no_results(Filter::new().fields(&[("name", &["doesnotexist".into()])]), 0)]
2437    fn count_relation_list(
2438        #[case] filter: Filter,
2439        #[case] expected_result: u64,
2440        key_pair: KeyPair,
2441    ) {
2442        test_runner(move |mut node: TestNode| async move {
2443            let (venues_schema, venues_view_ids) =
2444                create_venues_test_data(&mut node, &key_pair).await;
2445
2446            let (_, visited_view_ids) = create_visited_test_data(
2447                &mut node,
2448                venues_view_ids.clone(),
2449                venues_schema.clone(),
2450                &key_pair,
2451            )
2452            .await;
2453
2454            let args = Query::new(
2455                &Pagination::default(),
2456                &Select::default(),
2457                &filter,
2458                &Order::default(),
2459            );
2460
2461            // Select the pinned relation list "venues" of the first visited document
2462            let list = RelationList::new_pinned(&visited_view_ids[0], "venues");
2463
2464            let result = node
2465                .context
2466                .store
2467                .count(&venues_schema, &args, Some(&list))
2468                .await
2469                .unwrap();
2470
2471            assert_eq!(result, expected_result);
2472        });
2473    }
2474
2475    #[rstest]
2476    fn total_count_of_document_with_relation_list_field(key_pair: KeyPair) {
2477        test_runner(|mut node: TestNode| async move {
2478            let (venues_schema, venues_view_ids) =
2479                create_venues_test_data(&mut node, &key_pair).await;
2480
2481            let (visited_schema, _) = create_visited_test_data(
2482                &mut node,
2483                venues_view_ids,
2484                venues_schema.to_owned(),
2485                &key_pair,
2486            )
2487            .await;
2488
2489            let args = Query::new(
2490                &Pagination::new(
2491                    &NonZeroU64::new(25).unwrap(),
2492                    None,
2493                    &vec![PaginationField::TotalCount],
2494                ),
2495                &Select::new(&[Field::Meta(MetaField::DocumentId)]),
2496                &Filter::default(),
2497                &Order::default(),
2498            );
2499
2500            let (pagination_data, documents) = node
2501                .context
2502                .store
2503                .query(&visited_schema, &args, None)
2504                .await
2505                .expect("Query failed");
2506
2507            assert_eq!(documents.len(), 2);
2508            assert_eq!(pagination_data.total_count, Some(2));
2509        });
2510    }
2511
2512    #[rstest]
2513    fn select_cursor_during_conversion(schema_id: SchemaId) {
2514        let relation_list_hash = Hash::new_from_bytes(&[0]).to_string();
2515        let first_document_hash = Hash::new_from_bytes(&[1]).to_string();
2516        let second_document_hash = Hash::new_from_bytes(&[2]).to_string();
2517
2518        let root_cursor_1 = Hash::new_from_bytes(&[0, 1]).to_string();
2519        let root_cursor_2 = Hash::new_from_bytes(&[0, 2]).to_string();
2520        let cursor_1 = Hash::new_from_bytes(&[0, 3]).to_string();
2521        let cursor_2 = Hash::new_from_bytes(&[0, 4]).to_string();
2522        let cursor_3 = Hash::new_from_bytes(&[0, 5]).to_string();
2523        let cursor_4 = Hash::new_from_bytes(&[0, 6]).to_string();
2524
2525        let query_rows = vec![
2526            // First document
2527            // ==============
2528            QueryRow {
2529                document_id: first_document_hash.clone(),
2530                document_view_id: first_document_hash.clone(),
2531                operation_id: first_document_hash.clone(),
2532                is_deleted: false,
2533                is_edited: false,
2534                // This is the "root" cursor, marking the position of the document inside a
2535                // relation list
2536                root_cursor: root_cursor_1.clone(),
2537                // This is the "field" cursor, marking the value we're using to paginate the
2538                // resulting documents with.
2539                //
2540                // Cursors are unique for each operation field.
2541                cmp_value_cursor: cursor_1.clone(), // Cursor #1
2542                owner: OptionalOwner::default(),
2543                name: "username".to_string(),
2544                value: Some("panda".to_string()),
2545                field_type: "str".to_string(),
2546                list_index: 0,
2547            },
2548            QueryRow {
2549                document_id: first_document_hash.clone(),
2550                document_view_id: first_document_hash.clone(),
2551                operation_id: first_document_hash.clone(),
2552                is_deleted: false,
2553                is_edited: false,
2554                root_cursor: root_cursor_1.clone(),
2555                cmp_value_cursor: cursor_2.clone(), // Cursor #2
2556                owner: OptionalOwner::default(),
2557                name: "is_admin".to_string(),
2558                value: Some("false".to_string()),
2559                field_type: "bool".to_string(),
2560                list_index: 0,
2561            },
2562            // Second document
2563            // ===============
2564            QueryRow {
2565                document_id: second_document_hash.clone(),
2566                document_view_id: second_document_hash.clone(),
2567                operation_id: second_document_hash.clone(),
2568                is_deleted: false,
2569                is_edited: false,
2570                root_cursor: root_cursor_2.clone(),
2571                cmp_value_cursor: cursor_3.clone(), // Cursor #3
2572                owner: OptionalOwner::default(),
2573                name: "username".to_string(),
2574                value: Some("penguin".to_string()),
2575                field_type: "str".to_string(),
2576                list_index: 0,
2577            },
2578            QueryRow {
2579                document_id: second_document_hash.clone(),
2580                document_view_id: second_document_hash.clone(),
2581                operation_id: second_document_hash.clone(),
2582                is_deleted: false,
2583                is_edited: false,
2584                root_cursor: root_cursor_2.clone(),
2585                cmp_value_cursor: cursor_4.clone(), // Cursor #4
2586                owner: OptionalOwner::default(),
2587                name: "is_admin".to_string(),
2588                value: Some("true".to_string()),
2589                field_type: "bool".to_string(),
2590                list_index: 0,
2591            },
2592        ];
2593
2594        // 1.
2595        //
2596        // Convert query rows into documents as if this is a relation list query. We do this by
2597        // passing in the relation list information (the "root") from where this query was executed
2598        // from
2599        let result = convert_rows(
2600            query_rows.clone(),
2601            Some(&RelationList::new_unpinned(
2602                &relation_list_hash.parse().unwrap(),
2603                "relation_list_field",
2604            )),
2605            &vec!["username".to_string(), "is_admin".to_string()],
2606            &schema_id,
2607        );
2608
2609        assert_eq!(result.len(), 2);
2610
2611        // We expect the cursor of the last query row to be returned per document, that is cursor
2612        // #2 and #4
2613        assert_eq!(
2614            result[0].0,
2615            PaginationCursor::new(
2616                OperationCursor::from(cursor_2.as_str()),
2617                Some(OperationCursor::from(root_cursor_1.as_str())),
2618                Some(relation_list_hash.parse().unwrap())
2619            )
2620        );
2621        assert_eq!(
2622            result[1].0,
2623            PaginationCursor::new(
2624                OperationCursor::from(cursor_4.as_str()),
2625                Some(OperationCursor::from(root_cursor_2.as_str())),
2626                Some(relation_list_hash.parse().unwrap())
2627            )
2628        );
2629
2630        // 2.
2631        //
2632        // We pretend now that this query was executed without a relation list
2633        let result = convert_rows(
2634            query_rows,
2635            None,
2636            &vec!["username".to_string(), "is_admin".to_string()],
2637            &schema_id,
2638        );
2639
2640        assert_eq!(result.len(), 2);
2641        assert_eq!(
2642            result[0].0,
2643            PaginationCursor::new(OperationCursor::from(cursor_2.as_str()), None, None)
2644        );
2645        assert_eq!(
2646            result[1].0,
2647            PaginationCursor::new(OperationCursor::from(cursor_4.as_str()), None, None)
2648        );
2649    }
2650
2651    #[rstest]
2652    fn query_updated_documents_with_filter(
2653        #[from(populate_store_config)]
2654        // This config will populate the store with 10 documents which each have their username
2655        // field updated from "bubu" (doggo_schema) to "me"
2656        #[with(2, 10, vec![KeyPair::new()], false, doggo_schema(), doggo_fields(),
2657               vec![("username", OperationValue::String("me".to_string()))]
2658        )]
2659        config: PopulateStoreConfig,
2660    ) {
2661        test_runner(|mut node: TestNode| async move {
2662            // Populate the store and materialize all documents.
2663            populate_and_materialize(&mut node, &config).await;
2664
2665            let schema = doggo_schema();
2666
2667            let documents = node
2668                .context
2669                .store
2670                .get_documents_by_schema(schema.id())
2671                .await
2672                .unwrap();
2673            assert_eq!(documents.len(), 10);
2674            for document in documents {
2675                if document.get("username").unwrap() != &OperationValue::String("me".into()) {
2676                    panic!("All 'username' fields should have been updated to 'me'");
2677                }
2678            }
2679
2680            let args = Query::new(
2681                &Pagination::new(
2682                    &NonZeroU64::new(5).unwrap(),
2683                    None,
2684                    &vec![PaginationField::TotalCount],
2685                ),
2686                &Select::new(&[
2687                    Field::Meta(MetaField::DocumentId),
2688                    Field::Field("username".into()),
2689                    Field::Field("height".into()),
2690                    Field::Field("age".into()),
2691                    Field::Field("is_admin".into()),
2692                ]),
2693                &Filter::default().fields(&[("username", &["me".into()])]),
2694                &Order::default(),
2695            );
2696
2697            let (_pagination_data, documents) = node
2698                .context
2699                .store
2700                .query(&schema, &args, None)
2701                .await
2702                .expect("Query failed");
2703
2704            // We expect 5 documents and each should contain 4 fields.
2705            assert_eq!(documents.len(), 5);
2706            for (_cursor, document) in documents {
2707                assert_eq!(document.fields().unwrap().len(), 4);
2708                assert!(document.is_edited());
2709            }
2710        });
2711    }
2712}