Skip to main content

shelly_data/
repo.rs

1use crate::{
2    adapter::{AdapterKind, DatabaseConfig},
3    error::{DataError, DataResult},
4    query::{
5        Filter, FilterOperator, KeysetDirection, Query, SortDirection, WindowToken,
6        WireFormatProfile,
7    },
8    QueryContext,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::{
13    cmp::Ordering,
14    collections::{BTreeMap, HashMap},
15    sync::Arc,
16    time::{Instant, SystemTime, UNIX_EPOCH},
17};
18use tracing::{info, warn};
19
20pub type Row = BTreeMap<String, Value>;
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub struct StoredRow {
24    pub id: u64,
25    pub data: Row,
26}
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct CompactRowsPayload {
30    pub columns: Vec<String>,
31    pub rows: Vec<Vec<Value>>,
32}
33
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
35pub struct WindowPage {
36    pub offset: usize,
37    pub limit: usize,
38    pub total_rows: usize,
39    pub rows: Vec<StoredRow>,
40    pub query_fingerprint: String,
41    pub token: WindowToken,
42    pub wire_format: WireFormatProfile,
43    pub compact_rows: Option<CompactRowsPayload>,
44}
45
46#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47pub struct IncrementalDiff {
48    pub from_token: Option<WindowToken>,
49    pub to_token: WindowToken,
50    pub full_resync: bool,
51    pub inserted: Vec<StoredRow>,
52    pub updated: Vec<StoredRow>,
53    pub removed_ids: Vec<u64>,
54}
55
56pub trait AdapterDriver: Send + Sync {
57    fn kind(&self) -> AdapterKind;
58}
59
60#[derive(Debug, Default, Clone, Copy)]
61pub struct PostgresAdapter;
62
63impl AdapterDriver for PostgresAdapter {
64    fn kind(&self) -> AdapterKind {
65        AdapterKind::Postgres
66    }
67}
68
69#[derive(Debug, Default, Clone, Copy)]
70pub struct MySqlAdapter;
71
72impl AdapterDriver for MySqlAdapter {
73    fn kind(&self) -> AdapterKind {
74        AdapterKind::MySql
75    }
76}
77
78#[derive(Debug, Default, Clone, Copy)]
79pub struct SqliteAdapter;
80
81impl AdapterDriver for SqliteAdapter {
82    fn kind(&self) -> AdapterKind {
83        AdapterKind::Sqlite
84    }
85}
86
87#[derive(Debug, Default, Clone, Copy)]
88pub struct SingleStoreDriver;
89
90impl AdapterDriver for SingleStoreDriver {
91    fn kind(&self) -> AdapterKind {
92        AdapterKind::SingleStore
93    }
94}
95
96#[derive(Debug, Default, Clone, Copy)]
97pub struct ClickHouseDriver;
98
99impl AdapterDriver for ClickHouseDriver {
100    fn kind(&self) -> AdapterKind {
101        AdapterKind::ClickHouse
102    }
103}
104
105#[derive(Debug, Default, Clone, Copy)]
106pub struct BigQueryDriver;
107
108impl AdapterDriver for BigQueryDriver {
109    fn kind(&self) -> AdapterKind {
110        AdapterKind::BigQuery
111    }
112}
113
114#[derive(Debug, Default, Clone, Copy)]
115pub struct OpenSearchAdapterDriver;
116
117impl AdapterDriver for OpenSearchAdapterDriver {
118    fn kind(&self) -> AdapterKind {
119        AdapterKind::OpenSearch
120    }
121}
122
123pub fn adapter_for(config: &DatabaseConfig) -> DataResult<Box<dyn AdapterDriver>> {
124    match config.adapter {
125        AdapterKind::Postgres => Ok(Box::new(PostgresAdapter)),
126        AdapterKind::MySql => Ok(Box::new(MySqlAdapter)),
127        AdapterKind::Sqlite => Ok(Box::new(SqliteAdapter)),
128        AdapterKind::SingleStore => Ok(Box::new(SingleStoreDriver)),
129        AdapterKind::ClickHouse => Ok(Box::new(ClickHouseDriver)),
130        AdapterKind::BigQuery => Ok(Box::new(BigQueryDriver)),
131        AdapterKind::OpenSearch => Ok(Box::new(OpenSearchAdapterDriver)),
132        AdapterKind::None => Err(DataError::Adapter(
133            "database adapter is `none`; select a supported backend in shelly.data.toml"
134                .to_string(),
135        )),
136    }
137}
138
139pub trait Repo {
140    fn adapter_kind(&self) -> AdapterKind;
141    fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow>;
142    fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow>;
143    fn delete(&mut self, table: &str, id: u64) -> DataResult<()>;
144    fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>>;
145    fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>>;
146    fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage>;
147    fn materialize_incremental_diff(
148        &self,
149        previous: &WindowPage,
150        current: &WindowPage,
151    ) -> IncrementalDiff;
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
155#[serde(rename_all = "snake_case")]
156pub enum TenantRepoOperation {
157    Insert,
158    Update,
159    Delete,
160    Find,
161    List,
162    ListWindow,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct TenantPolicyContext {
167    pub table: String,
168    pub operation: TenantRepoOperation,
169    pub tenant_id: Option<String>,
170    pub row_tenant_id: Option<String>,
171    pub row_id: Option<u64>,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct TenantPolicyDecision {
176    pub allowed: bool,
177    pub code: Option<String>,
178    pub message: Option<String>,
179}
180
181impl TenantPolicyDecision {
182    pub fn allow() -> Self {
183        Self {
184            allowed: true,
185            code: None,
186            message: None,
187        }
188    }
189
190    pub fn deny(code: impl Into<String>, message: impl Into<String>) -> Self {
191        Self {
192            allowed: false,
193            code: Some(code.into()),
194            message: Some(message.into()),
195        }
196    }
197}
198
199pub type TenantPolicyHook = Arc<dyn Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync>;
200
201#[derive(Clone)]
202pub struct TenantRepoConfig {
203    pub tenant_field: String,
204    pub require_tenant_context: bool,
205    pub policy_hook: Option<TenantPolicyHook>,
206}
207
208impl Default for TenantRepoConfig {
209    fn default() -> Self {
210        Self {
211            tenant_field: "tenant_id".to_string(),
212            require_tenant_context: true,
213            policy_hook: None,
214        }
215    }
216}
217
218pub struct TenantScopedRepo<R: Repo> {
219    inner: R,
220    config: TenantRepoConfig,
221}
222
223impl<R: Repo> TenantScopedRepo<R> {
224    pub fn new(inner: R) -> Self {
225        Self {
226            inner,
227            config: TenantRepoConfig::default(),
228        }
229    }
230
231    pub fn with_config(mut self, config: TenantRepoConfig) -> Self {
232        self.config = config;
233        self
234    }
235
236    pub fn with_policy_hook<F>(mut self, policy_hook: F) -> Self
237    where
238        F: Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync + 'static,
239    {
240        self.config.policy_hook = Some(Arc::new(policy_hook));
241        self
242    }
243
244    pub fn inner(&self) -> &R {
245        &self.inner
246    }
247
248    pub fn inner_mut(&mut self) -> &mut R {
249        &mut self.inner
250    }
251
252    pub fn into_inner(self) -> R {
253        self.inner
254    }
255
256    pub fn insert_scoped(
257        &mut self,
258        table: &str,
259        context: &QueryContext,
260        mut data: Row,
261    ) -> DataResult<StoredRow> {
262        let tenant_id = self.required_tenant_id(context)?;
263        let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
264        self.ensure_tenant_match(
265            table,
266            TenantRepoOperation::Insert,
267            tenant_id.as_deref(),
268            row_tenant_id.as_deref(),
269            None,
270        )?;
271        if let Some(tenant_id) = tenant_id.as_deref() {
272            data.insert(
273                self.config.tenant_field.clone(),
274                Value::String(tenant_id.to_string()),
275            );
276        }
277        self.inner.insert(table, data)
278    }
279
280    pub fn update_scoped(
281        &mut self,
282        table: &str,
283        id: u64,
284        context: &QueryContext,
285        mut data: Row,
286    ) -> DataResult<StoredRow> {
287        let tenant_id = self.required_tenant_id(context)?;
288        let existing = self.inner.find(table, id)?;
289        if let Some(existing) = existing.as_ref() {
290            self.ensure_tenant_match(
291                table,
292                TenantRepoOperation::Update,
293                tenant_id.as_deref(),
294                tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
295                Some(id),
296            )?;
297        }
298        let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
299        self.ensure_tenant_match(
300            table,
301            TenantRepoOperation::Update,
302            tenant_id.as_deref(),
303            row_tenant_id.as_deref(),
304            Some(id),
305        )?;
306        if let Some(tenant_id) = tenant_id.as_deref() {
307            data.insert(
308                self.config.tenant_field.clone(),
309                Value::String(tenant_id.to_string()),
310            );
311        }
312        self.inner.update(table, id, data)
313    }
314
315    pub fn delete_scoped(
316        &mut self,
317        table: &str,
318        id: u64,
319        context: &QueryContext,
320    ) -> DataResult<()> {
321        let tenant_id = self.required_tenant_id(context)?;
322        let existing = self.inner.find(table, id)?;
323        if let Some(existing) = existing.as_ref() {
324            self.ensure_tenant_match(
325                table,
326                TenantRepoOperation::Delete,
327                tenant_id.as_deref(),
328                tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
329                Some(id),
330            )?;
331        }
332        self.inner.delete(table, id)
333    }
334
335    pub fn find_scoped(
336        &self,
337        table: &str,
338        id: u64,
339        context: &QueryContext,
340    ) -> DataResult<Option<StoredRow>> {
341        let tenant_id = self.required_tenant_id(context)?;
342        let row = self.inner.find(table, id)?;
343        if let Some(row) = row.as_ref() {
344            self.ensure_tenant_match(
345                table,
346                TenantRepoOperation::Find,
347                tenant_id.as_deref(),
348                tenant_id_from_row(&row.data, self.config.tenant_field.as_str()).as_deref(),
349                Some(id),
350            )?;
351        }
352        Ok(row)
353    }
354
355    pub fn list_scoped(
356        &self,
357        table: &str,
358        query: &Query,
359        context: &QueryContext,
360    ) -> DataResult<Vec<StoredRow>> {
361        let tenant_id = self.required_tenant_id(context)?;
362        self.evaluate_policy(TenantPolicyContext {
363            table: table.to_string(),
364            operation: TenantRepoOperation::List,
365            tenant_id: tenant_id.clone(),
366            row_tenant_id: None,
367            row_id: None,
368        })?;
369        let scoped_query = self.scoped_query(query, tenant_id.as_deref());
370        self.inner.list(table, &scoped_query)
371    }
372
373    pub fn list_window_scoped(
374        &self,
375        table: &str,
376        query: &Query,
377        context: &QueryContext,
378    ) -> DataResult<WindowPage> {
379        let tenant_id = self.required_tenant_id(context)?;
380        self.evaluate_policy(TenantPolicyContext {
381            table: table.to_string(),
382            operation: TenantRepoOperation::ListWindow,
383            tenant_id: tenant_id.clone(),
384            row_tenant_id: None,
385            row_id: None,
386        })?;
387        let scoped_query = self.scoped_query(query, tenant_id.as_deref());
388        self.inner.list_window(table, &scoped_query)
389    }
390
391    fn scoped_query(&self, query: &Query, tenant_id: Option<&str>) -> Query {
392        let mut scoped = query.clone();
393        if let Some(tenant_id) = tenant_id {
394            scoped.filters.push(Filter::eq(
395                self.config.tenant_field.clone(),
396                Value::String(tenant_id.to_string()),
397            ));
398        }
399        scoped
400    }
401
402    fn required_tenant_id(&self, context: &QueryContext) -> DataResult<Option<String>> {
403        let tenant_id = normalize_tenant_id(context.tenant_id.as_deref());
404        if self.config.require_tenant_context && tenant_id.is_none() {
405            return Err(tenant_error(
406                "tenant_context_required",
407                "tenant context is required for scoped data operation",
408            ));
409        }
410        Ok(tenant_id)
411    }
412
413    fn ensure_tenant_match(
414        &self,
415        table: &str,
416        operation: TenantRepoOperation,
417        tenant_id: Option<&str>,
418        row_tenant_id: Option<&str>,
419        row_id: Option<u64>,
420    ) -> DataResult<()> {
421        let normalized_tenant_id = normalize_tenant_id(tenant_id);
422        let normalized_row_tenant_id = normalize_tenant_id(row_tenant_id);
423        if let Some(expected_tenant) = normalized_tenant_id.as_deref() {
424            if let Some(row_tenant) = normalized_row_tenant_id.as_deref() {
425                if row_tenant != expected_tenant {
426                    return Err(tenant_error(
427                        "tenant_row_mismatch",
428                        format!(
429                            "row tenant does not match tenant context for table `{table}` (row_id={row_id:?})"
430                        ),
431                    ));
432                }
433            }
434        }
435        self.evaluate_policy(TenantPolicyContext {
436            table: table.to_string(),
437            operation,
438            tenant_id: normalized_tenant_id,
439            row_tenant_id: normalized_row_tenant_id,
440            row_id,
441        })
442    }
443
444    fn evaluate_policy(&self, context: TenantPolicyContext) -> DataResult<()> {
445        let Some(policy_hook) = self.config.policy_hook.as_ref() else {
446            return Ok(());
447        };
448        let decision = policy_hook(&context);
449        if decision.allowed {
450            Ok(())
451        } else {
452            Err(tenant_error(
453                decision.code.as_deref().unwrap_or("tenant_policy_denied"),
454                decision
455                    .message
456                    .as_deref()
457                    .unwrap_or("tenant policy denied data operation"),
458            ))
459        }
460    }
461}
462
463fn tenant_error(code: &str, message: impl Into<String>) -> DataError {
464    DataError::Query(format!("[{code}] {}", message.into()))
465}
466
467fn normalize_tenant_id(tenant_id: Option<&str>) -> Option<String> {
468    tenant_id
469        .map(str::trim)
470        .filter(|value| !value.is_empty())
471        .map(ToString::to_string)
472}
473
474fn tenant_id_from_row(row: &Row, tenant_field: &str) -> Option<String> {
475    row.get(tenant_field)
476        .and_then(Value::as_str)
477        .and_then(|value| normalize_tenant_id(Some(value)))
478}
479
480pub struct MemoryRepo {
481    driver: Box<dyn AdapterDriver>,
482    tables: BTreeMap<String, Vec<StoredRow>>,
483    next_id: u64,
484}
485
486impl MemoryRepo {
487    pub fn new(driver: Box<dyn AdapterDriver>) -> Self {
488        Self {
489            driver,
490            tables: BTreeMap::new(),
491            next_id: 1,
492        }
493    }
494}
495
496impl Repo for MemoryRepo {
497    fn adapter_kind(&self) -> AdapterKind {
498        self.driver.kind()
499    }
500
501    fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow> {
502        let started_at = Instant::now();
503        let result = {
504            let entry = self.tables.entry(table.to_string()).or_default();
505            let row = StoredRow {
506                id: self.next_id,
507                data,
508            };
509            self.next_id += 1;
510            entry.push(row.clone());
511            Ok(row)
512        };
513
514        match &result {
515            Ok(row) => info!(
516                target: "shelly.data.query",
517                source = "memory_repo",
518                adapter = self.driver.kind().as_str(),
519                operation = "insert",
520                table,
521                row_id = row.id,
522                duration_ms = started_at.elapsed().as_millis() as u64,
523                "Shelly data query executed"
524            ),
525            Err(err) => warn!(
526                target: "shelly.data.query",
527                source = "memory_repo",
528                adapter = self.driver.kind().as_str(),
529                operation = "insert",
530                table,
531                duration_ms = started_at.elapsed().as_millis() as u64,
532                error = %err,
533                "Shelly data query failed"
534            ),
535        }
536
537        result
538    }
539
540    fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow> {
541        let started_at = Instant::now();
542        let result = {
543            let rows = self.tables.entry(table.to_string()).or_default();
544            match rows.iter_mut().find(|row| row.id == id) {
545                Some(existing) => {
546                    existing.data = data;
547                    Ok(existing.clone())
548                }
549                None => Err(DataError::Query(format!(
550                    "row id {id} not found in table `{table}`"
551                ))),
552            }
553        };
554
555        match &result {
556            Ok(row) => info!(
557                target: "shelly.data.query",
558                source = "memory_repo",
559                adapter = self.driver.kind().as_str(),
560                operation = "update",
561                table,
562                row_id = row.id,
563                duration_ms = started_at.elapsed().as_millis() as u64,
564                "Shelly data query executed"
565            ),
566            Err(err) => warn!(
567                target: "shelly.data.query",
568                source = "memory_repo",
569                adapter = self.driver.kind().as_str(),
570                operation = "update",
571                table,
572                row_id = id,
573                duration_ms = started_at.elapsed().as_millis() as u64,
574                error = %err,
575                "Shelly data query failed"
576            ),
577        }
578
579        result
580    }
581
582    fn delete(&mut self, table: &str, id: u64) -> DataResult<()> {
583        let started_at = Instant::now();
584        let result = {
585            let rows = self.tables.entry(table.to_string()).or_default();
586            let initial_len = rows.len();
587            rows.retain(|row| row.id != id);
588            if rows.len() == initial_len {
589                Err(DataError::Query(format!(
590                    "row id {id} not found in table `{table}`"
591                )))
592            } else {
593                Ok(())
594            }
595        };
596
597        match &result {
598            Ok(()) => info!(
599                target: "shelly.data.query",
600                source = "memory_repo",
601                adapter = self.driver.kind().as_str(),
602                operation = "delete",
603                table,
604                row_id = id,
605                duration_ms = started_at.elapsed().as_millis() as u64,
606                "Shelly data query executed"
607            ),
608            Err(err) => warn!(
609                target: "shelly.data.query",
610                source = "memory_repo",
611                adapter = self.driver.kind().as_str(),
612                operation = "delete",
613                table,
614                row_id = id,
615                duration_ms = started_at.elapsed().as_millis() as u64,
616                error = %err,
617                "Shelly data query failed"
618            ),
619        }
620
621        result
622    }
623
624    fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>> {
625        let started_at = Instant::now();
626        let result = Ok(self
627            .tables
628            .get(table)
629            .and_then(|rows| rows.iter().find(|row| row.id == id))
630            .cloned());
631
632        match &result {
633            Ok(row) => info!(
634                target: "shelly.data.query",
635                source = "memory_repo",
636                adapter = self.driver.kind().as_str(),
637                operation = "find",
638                table,
639                row_id = id,
640                found = row.is_some(),
641                duration_ms = started_at.elapsed().as_millis() as u64,
642                "Shelly data query executed"
643            ),
644            Err(err) => warn!(
645                target: "shelly.data.query",
646                source = "memory_repo",
647                adapter = self.driver.kind().as_str(),
648                operation = "find",
649                table,
650                row_id = id,
651                duration_ms = started_at.elapsed().as_millis() as u64,
652                error = %err,
653                "Shelly data query failed"
654            ),
655        }
656
657        result
658    }
659
660    fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>> {
661        let started_at = Instant::now();
662        let result = {
663            let rows = self.tables.get(table).cloned().unwrap_or_default();
664            let rows = materialize_rows(rows, query);
665            Ok(rows)
666        };
667
668        match &result {
669            Ok(rows) => info!(
670                target: "shelly.data.query",
671                source = "memory_repo",
672                adapter = self.driver.kind().as_str(),
673                operation = "list",
674                table,
675                row_count = rows.len(),
676                filter_count = query.filters.len(),
677                sort_count = query.sorts.len(),
678                page = query.pagination.map(|value| value.page),
679                per_page = query.pagination.map(|value| value.per_page),
680                keyset_limit = query.keyset.as_ref().map(|value| value.limit),
681                wire_format = ?query.wire_format,
682                duration_ms = started_at.elapsed().as_millis() as u64,
683                "Shelly data query executed"
684            ),
685            Err(err) => warn!(
686                target: "shelly.data.query",
687                source = "memory_repo",
688                adapter = self.driver.kind().as_str(),
689                operation = "list",
690                table,
691                filter_count = query.filters.len(),
692                sort_count = query.sorts.len(),
693                page = query.pagination.map(|value| value.page),
694                per_page = query.pagination.map(|value| value.per_page),
695                keyset_limit = query.keyset.as_ref().map(|value| value.limit),
696                wire_format = ?query.wire_format,
697                duration_ms = started_at.elapsed().as_millis() as u64,
698                error = %err,
699                "Shelly data query failed"
700            ),
701        }
702
703        result
704    }
705
706    fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage> {
707        if !query.has_valid_window_token() {
708            return Err(DataError::Query(
709                "window token query fingerprint mismatch".to_string(),
710            ));
711        }
712
713        let started_at = Instant::now();
714        let all_rows = materialize_rows(self.tables.get(table).cloned().unwrap_or_default(), query);
715        let total_rows = all_rows.len();
716        let (offset, limit) = window_range_from_query(query, total_rows);
717        let rows = all_rows
718            .into_iter()
719            .skip(offset)
720            .take(limit.max(1))
721            .collect::<Vec<_>>();
722        let token = query.next_window_token(offset, limit.max(1), now_epoch_ms(), {
723            (offset as u64)
724                ^ (limit as u64)
725                ^ (rows.len() as u64)
726                ^ (total_rows as u64)
727                ^ self.next_id
728        });
729        let compact_rows =
730            (query.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
731        let page = WindowPage {
732            offset,
733            limit: limit.max(1),
734            total_rows,
735            rows,
736            query_fingerprint: query.fingerprint(),
737            token,
738            wire_format: query.wire_format,
739            compact_rows,
740        };
741
742        info!(
743            target: "shelly.data.query",
744            source = "memory_repo",
745            adapter = self.driver.kind().as_str(),
746            operation = "list_window",
747            table,
748            row_count = page.rows.len(),
749            total_rows = page.total_rows,
750            offset = page.offset,
751            limit = page.limit,
752            wire_format = ?page.wire_format,
753            duration_ms = started_at.elapsed().as_millis() as u64,
754            "Shelly data query executed"
755        );
756
757        Ok(page)
758    }
759
760    fn materialize_incremental_diff(
761        &self,
762        previous: &WindowPage,
763        current: &WindowPage,
764    ) -> IncrementalDiff {
765        let full_resync = previous.query_fingerprint != current.query_fingerprint
766            || previous.wire_format != current.wire_format;
767
768        if full_resync {
769            return IncrementalDiff {
770                from_token: Some(previous.token.clone()),
771                to_token: current.token.clone(),
772                full_resync: true,
773                inserted: current.rows.clone(),
774                updated: Vec::new(),
775                removed_ids: Vec::new(),
776            };
777        }
778
779        let previous_rows: HashMap<u64, &StoredRow> =
780            previous.rows.iter().map(|row| (row.id, row)).collect();
781        let current_rows: HashMap<u64, &StoredRow> =
782            current.rows.iter().map(|row| (row.id, row)).collect();
783
784        let mut inserted = Vec::new();
785        let mut updated = Vec::new();
786        for row in &current.rows {
787            match previous_rows.get(&row.id) {
788                None => inserted.push(row.clone()),
789                Some(previous_row) if previous_row.data != row.data => updated.push(row.clone()),
790                _ => {}
791            }
792        }
793
794        let mut removed_ids = previous_rows
795            .keys()
796            .filter(|id| !current_rows.contains_key(id))
797            .copied()
798            .collect::<Vec<_>>();
799        removed_ids.sort_unstable();
800
801        IncrementalDiff {
802            from_token: Some(previous.token.clone()),
803            to_token: current.token.clone(),
804            full_resync: false,
805            inserted,
806            updated,
807            removed_ids,
808        }
809    }
810}
811
812fn materialize_rows(mut rows: Vec<StoredRow>, query: &Query) -> Vec<StoredRow> {
813    if !query.filters.is_empty() {
814        rows.retain(|row| {
815            query
816                .filters
817                .iter()
818                .all(|filter| matches_filter(row, filter))
819        });
820    }
821
822    for sort in query.sorts.iter().rev() {
823        rows.sort_by(|left, right| compare_for_sort(left, right, sort.field.as_str()));
824        if sort.direction == SortDirection::Desc {
825            rows.reverse();
826        }
827    }
828
829    if let Some(keyset) = &query.keyset {
830        rows = apply_keyset(rows, keyset);
831    } else if let Some(pagination) = query.pagination {
832        let offset = (pagination.page.saturating_sub(1)) * pagination.per_page;
833        rows = rows
834            .into_iter()
835            .skip(offset)
836            .take(pagination.per_page)
837            .collect();
838    }
839
840    rows
841}
842
843fn apply_keyset(rows: Vec<StoredRow>, keyset: &crate::query::KeysetPagination) -> Vec<StoredRow> {
844    let limit = keyset.limit.max(1);
845    let Some(cursor) = keyset.cursor.as_ref() else {
846        return rows.into_iter().take(limit).collect();
847    };
848
849    match cursor.direction {
850        KeysetDirection::Forward => rows
851            .into_iter()
852            .filter(|row| compare_row_cursor(row, cursor) == Ordering::Greater)
853            .take(limit)
854            .collect(),
855        KeysetDirection::Backward => {
856            let mut filtered = rows
857                .into_iter()
858                .filter(|row| compare_row_cursor(row, cursor) == Ordering::Less)
859                .collect::<Vec<_>>();
860            let keep_from = filtered.len().saturating_sub(limit);
861            filtered.drain(0..keep_from);
862            filtered
863        }
864    }
865}
866
867fn compare_row_cursor(row: &StoredRow, cursor: &crate::query::KeysetCursor) -> Ordering {
868    row.data
869        .get(&cursor.field)
870        .map(|value| compare_json_values(value, &cursor.value))
871        .unwrap_or_else(|| row.id.cmp(&cursor.value.as_u64().unwrap_or_default()))
872}
873
874fn compare_json_values(left: &Value, right: &Value) -> Ordering {
875    match (left, right) {
876        (Value::Number(_), Value::Number(_)) => {
877            compare_numbers(left, right).unwrap_or(Ordering::Equal)
878        }
879        (Value::String(left), Value::String(right)) => left.cmp(right),
880        (Value::Bool(left), Value::Bool(right)) => left.cmp(right),
881        _ => left.to_string().cmp(&right.to_string()),
882    }
883}
884
885fn window_range_from_query(query: &Query, total_rows: usize) -> (usize, usize) {
886    if let Some(window) = query.window {
887        let span = window.span().max(1);
888        let overscan = window.overscan;
889        let start = window.start.saturating_sub(overscan);
890        let limit = span.saturating_add(overscan.saturating_mul(2)).max(1);
891        if total_rows == 0 {
892            (0, limit)
893        } else {
894            (start.min(total_rows.saturating_sub(1)), limit)
895        }
896    } else if let Some(pagination) = query.pagination {
897        (
898            (pagination.page.saturating_sub(1)) * pagination.per_page,
899            pagination.per_page.max(1),
900        )
901    } else if let Some(keyset) = &query.keyset {
902        (
903            query
904                .window_token
905                .as_ref()
906                .map(|token| token.offset)
907                .unwrap_or(0),
908            keyset.limit.max(1),
909        )
910    } else {
911        (
912            query
913                .window_token
914                .as_ref()
915                .map(|token| token.offset)
916                .unwrap_or(0),
917            query
918                .window_token
919                .as_ref()
920                .map(|token| token.limit)
921                .unwrap_or_else(|| total_rows.max(1)),
922        )
923    }
924}
925
926fn encode_compact_rows(rows: &[StoredRow]) -> CompactRowsPayload {
927    let mut columns = vec!["id".to_string()];
928    for row in rows {
929        for key in row.data.keys() {
930            if !columns.iter().any(|column| column == key) {
931                columns.push(key.clone());
932            }
933        }
934    }
935
936    let mut encoded_rows = Vec::with_capacity(rows.len());
937    for row in rows {
938        let mut encoded = Vec::with_capacity(columns.len());
939        for column in &columns {
940            if column == "id" {
941                encoded.push(Value::from(row.id));
942            } else {
943                encoded.push(row.data.get(column).cloned().unwrap_or(Value::Null));
944            }
945        }
946        encoded_rows.push(encoded);
947    }
948
949    CompactRowsPayload {
950        columns,
951        rows: encoded_rows,
952    }
953}
954
955fn now_epoch_ms() -> u64 {
956    SystemTime::now()
957        .duration_since(UNIX_EPOCH)
958        .map(|duration| duration.as_millis() as u64)
959        .unwrap_or_default()
960}
961
962fn matches_filter(row: &StoredRow, filter: &crate::query::Filter) -> bool {
963    let Some(candidate) = row.data.get(&filter.field) else {
964        return false;
965    };
966    match filter.op {
967        FilterOperator::Eq => candidate == &filter.value,
968        FilterOperator::Neq => candidate != &filter.value,
969        FilterOperator::Contains => candidate
970            .as_str()
971            .zip(filter.value.as_str())
972            .is_some_and(|(left, right)| left.contains(right)),
973        FilterOperator::Gt => {
974            compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Greater)
975        }
976        FilterOperator::Gte => compare_numbers(candidate, &filter.value)
977            .is_some_and(|ord| ord == Ordering::Greater || ord == Ordering::Equal),
978        FilterOperator::Lt => {
979            compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Less)
980        }
981        FilterOperator::Lte => compare_numbers(candidate, &filter.value)
982            .is_some_and(|ord| ord == Ordering::Less || ord == Ordering::Equal),
983    }
984}
985
986fn compare_for_sort(left: &StoredRow, right: &StoredRow, field: &str) -> Ordering {
987    let left_value = left.data.get(field);
988    let right_value = right.data.get(field);
989    match (left_value, right_value) {
990        (Some(Value::Number(left_num)), Some(Value::Number(right_num))) => left_num
991            .as_f64()
992            .partial_cmp(&right_num.as_f64())
993            .unwrap_or(Ordering::Equal),
994        (Some(Value::String(left_text)), Some(Value::String(right_text))) => {
995            left_text.cmp(right_text)
996        }
997        _ => left.id.cmp(&right.id),
998    }
999}
1000
1001fn compare_numbers(left: &Value, right: &Value) -> Option<Ordering> {
1002    left.as_f64()
1003        .zip(right.as_f64())
1004        .and_then(|(left, right)| left.partial_cmp(&right))
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009    use super::{
1010        adapter_for, DatabaseConfig, MemoryRepo, Repo, Row, TenantPolicyDecision,
1011        TenantRepoOperation, TenantScopedRepo,
1012    };
1013    use crate::{
1014        AdapterKind, DataError, Filter, FilterOperator, Query, QueryContext, SortDirection,
1015        WireFormatProfile,
1016    };
1017    use serde_json::json;
1018
1019    #[test]
1020    fn memory_repo_works_for_adapter_selection() {
1021        let mut repo = MemoryRepo::new(
1022            adapter_for(&DatabaseConfig {
1023                adapter: AdapterKind::Sqlite,
1024                url: None,
1025                url_env: None,
1026            })
1027            .unwrap(),
1028        );
1029        let mut row = Row::new();
1030        row.insert("title".to_string(), json!("Alpha"));
1031        row.insert("score".to_string(), json!(10));
1032        repo.insert("posts", row).unwrap();
1033
1034        let rows = repo
1035            .list(
1036                "posts",
1037                &Query::new()
1038                    .where_filter(Filter::contains("title", "Al"))
1039                    .order_by("score", SortDirection::Desc),
1040            )
1041            .unwrap();
1042        assert_eq!(rows.len(), 1);
1043        assert_eq!(rows[0].data.get("title"), Some(&json!("Alpha")));
1044    }
1045
1046    #[test]
1047    fn adapter_for_rejects_none_and_selects_expected_driver() {
1048        let none_result = adapter_for(&DatabaseConfig {
1049            adapter: AdapterKind::None,
1050            url: None,
1051            url_env: None,
1052        });
1053        assert!(matches!(none_result, Err(DataError::Adapter(_))));
1054
1055        for kind in [
1056            AdapterKind::Postgres,
1057            AdapterKind::MySql,
1058            AdapterKind::Sqlite,
1059            AdapterKind::SingleStore,
1060            AdapterKind::ClickHouse,
1061            AdapterKind::BigQuery,
1062            AdapterKind::OpenSearch,
1063        ] {
1064            let driver = adapter_for(&DatabaseConfig {
1065                adapter: kind,
1066                url: None,
1067                url_env: None,
1068            })
1069            .expect("driver should be created");
1070            assert_eq!(driver.kind(), kind);
1071        }
1072    }
1073
1074    #[test]
1075    fn update_delete_and_find_cover_missing_rows() {
1076        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1077
1078        let mut row = Row::new();
1079        row.insert("title".to_string(), json!("Draft"));
1080        let inserted = repo.insert("posts", row).expect("insert should work");
1081
1082        assert_eq!(
1083            repo.find("posts", inserted.id)
1084                .expect("find should not fail")
1085                .map(|it| it.id),
1086            Some(inserted.id)
1087        );
1088        assert!(repo
1089            .find("posts", 999)
1090            .expect("find should not fail")
1091            .is_none());
1092        assert!(repo
1093            .find("missing_table", inserted.id)
1094            .expect("find should not fail")
1095            .is_none());
1096
1097        let mut updated = Row::new();
1098        updated.insert("title".to_string(), json!("Published"));
1099        let updated_row = repo
1100            .update("posts", inserted.id, updated)
1101            .expect("update should work");
1102        assert_eq!(updated_row.data.get("title"), Some(&json!("Published")));
1103
1104        let update_err = repo
1105            .update("posts", 404, Row::new())
1106            .expect_err("missing row should fail update");
1107        assert!(matches!(update_err, DataError::Query(_)));
1108
1109        repo.delete("posts", inserted.id)
1110            .expect("delete should remove row");
1111        let delete_err = repo
1112            .delete("posts", inserted.id)
1113            .expect_err("deleting missing row should fail");
1114        assert!(matches!(delete_err, DataError::Query(_)));
1115    }
1116
1117    #[test]
1118    fn tenant_scoped_repo_enforces_row_level_isolation() {
1119        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1120        let mut scoped = TenantScopedRepo::new(repo);
1121
1122        let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1123        let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1124
1125        let mut first = Row::new();
1126        first.insert("name".to_string(), json!("Acme"));
1127        let inserted = scoped
1128            .insert_scoped("accounts", &tenant_a, first)
1129            .expect("insert tenant-a row");
1130
1131        let found = scoped
1132            .find_scoped("accounts", inserted.id, &tenant_a)
1133            .expect("find tenant-a row");
1134        assert!(found.is_some());
1135
1136        let denied = scoped
1137            .find_scoped("accounts", inserted.id, &tenant_b)
1138            .expect_err("cross-tenant find should be denied");
1139        assert!(denied.to_string().contains("tenant_row_mismatch"));
1140    }
1141
1142    #[test]
1143    fn tenant_scoped_repo_requires_tenant_context_by_default() {
1144        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1145        let mut scoped = TenantScopedRepo::new(repo);
1146
1147        let mut row = Row::new();
1148        row.insert("name".to_string(), json!("NoTenant"));
1149        let err = scoped
1150            .insert_scoped("accounts", &QueryContext::default(), row)
1151            .expect_err("missing tenant context should be denied");
1152        assert!(err.to_string().contains("tenant_context_required"));
1153    }
1154
1155    #[test]
1156    fn tenant_scoped_repo_policy_hook_can_reject_operations() {
1157        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1158        let mut scoped = TenantScopedRepo::new(repo).with_policy_hook(|ctx| {
1159            if ctx.operation == TenantRepoOperation::Delete {
1160                TenantPolicyDecision::deny("tenant_delete_denied", "tenant delete denied")
1161            } else {
1162                TenantPolicyDecision::allow()
1163            }
1164        });
1165
1166        let tenant = QueryContext::default().with_tenant_id("tenant-a");
1167        let mut row = Row::new();
1168        row.insert("name".to_string(), json!("Acme"));
1169        let inserted = scoped
1170            .insert_scoped("accounts", &tenant, row)
1171            .expect("insert tenant row");
1172
1173        let err = scoped
1174            .delete_scoped("accounts", inserted.id, &tenant)
1175            .expect_err("policy should reject delete");
1176        assert!(err.to_string().contains("tenant_delete_denied"));
1177    }
1178
1179    #[test]
1180    fn tenant_scoped_repo_update_and_window_list_enforce_scope() {
1181        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1182        let mut scoped = TenantScopedRepo::new(repo);
1183
1184        let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1185        let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1186
1187        let mut first = Row::new();
1188        first.insert("name".to_string(), json!("Acme"));
1189        first.insert("score".to_string(), json!(10));
1190        let inserted = scoped
1191            .insert_scoped("accounts", &tenant_a, first)
1192            .expect("insert tenant-a row");
1193
1194        let mut update = Row::new();
1195        update.insert("name".to_string(), json!("Acme Prime"));
1196        update.insert("score".to_string(), json!(20));
1197        let updated = scoped
1198            .update_scoped("accounts", inserted.id, &tenant_a, update)
1199            .expect("update tenant-a row");
1200        assert_eq!(updated.data.get("tenant_id"), Some(&json!("tenant-a")));
1201
1202        let mut bad_update = Row::new();
1203        bad_update.insert("name".to_string(), json!("Cross"));
1204        bad_update.insert("tenant_id".to_string(), json!("tenant-b"));
1205        let err = scoped
1206            .update_scoped("accounts", inserted.id, &tenant_a, bad_update)
1207            .expect_err("tenant mismatch on row payload should fail");
1208        assert!(err.to_string().contains("tenant_row_mismatch"));
1209
1210        let list = scoped
1211            .list_scoped(
1212                "accounts",
1213                &Query::new().order_by("score", SortDirection::Asc),
1214                &tenant_a,
1215            )
1216            .expect("list scoped");
1217        assert_eq!(list.len(), 1);
1218
1219        let window = scoped
1220            .list_window_scoped(
1221                "accounts",
1222                &Query::new().order_by("score", SortDirection::Asc).window(1, 5, 2),
1223                &tenant_a,
1224            )
1225            .expect("list window scoped");
1226        assert_eq!(window.rows.len(), 1);
1227
1228        let denied = scoped
1229            .find_scoped("accounts", inserted.id, &tenant_b)
1230            .expect_err("cross-tenant find should still fail");
1231        assert!(denied.to_string().contains("tenant_row_mismatch"));
1232    }
1233
1234    #[test]
1235    fn list_applies_filters_sorts_and_pagination() {
1236        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1237
1238        let mut alpha = Row::new();
1239        alpha.insert("title".to_string(), json!("Alpha"));
1240        alpha.insert("score".to_string(), json!(10));
1241        alpha.insert("tag".to_string(), json!("core"));
1242        repo.insert("posts", alpha).expect("insert alpha");
1243
1244        let mut beta = Row::new();
1245        beta.insert("title".to_string(), json!("Beta"));
1246        beta.insert("score".to_string(), json!(20));
1247        beta.insert("tag".to_string(), json!("ops"));
1248        repo.insert("posts", beta).expect("insert beta");
1249
1250        let mut gamma = Row::new();
1251        gamma.insert("title".to_string(), json!("Gamma"));
1252        gamma.insert("score".to_string(), json!(15));
1253        gamma.insert("tag".to_string(), json!(123));
1254        repo.insert("posts", gamma).expect("insert gamma");
1255
1256        let eq_rows = repo
1257            .list(
1258                "posts",
1259                &Query::new().where_filter(Filter::eq("title", json!("Alpha"))),
1260            )
1261            .expect("eq filter");
1262        assert_eq!(eq_rows.len(), 1);
1263        assert_eq!(eq_rows[0].data.get("title"), Some(&json!("Alpha")));
1264
1265        let neq_rows = repo
1266            .list(
1267                "posts",
1268                &Query::new().where_filter(crate::Filter {
1269                    field: "title".to_string(),
1270                    op: FilterOperator::Neq,
1271                    value: json!("Alpha"),
1272                }),
1273            )
1274            .expect("neq filter");
1275        assert_eq!(neq_rows.len(), 2);
1276
1277        let contains_rows = repo
1278            .list(
1279                "posts",
1280                &Query::new().where_filter(Filter::contains("title", "mm")),
1281            )
1282            .expect("contains filter");
1283        assert_eq!(contains_rows.len(), 1);
1284        assert_eq!(contains_rows[0].data.get("title"), Some(&json!("Gamma")));
1285
1286        let contains_non_string_rows = repo
1287            .list(
1288                "posts",
1289                &Query::new().where_filter(Filter::contains("tag", "2")),
1290            )
1291            .expect("contains on mixed type");
1292        assert!(contains_non_string_rows.is_empty());
1293
1294        for (op, expected_titles) in [
1295            (FilterOperator::Gt, vec!["Beta"]),
1296            (FilterOperator::Gte, vec!["Beta", "Gamma"]),
1297            (FilterOperator::Lt, vec!["Alpha"]),
1298            (FilterOperator::Lte, vec!["Alpha", "Gamma"]),
1299        ] {
1300            let rows = repo
1301                .list(
1302                    "posts",
1303                    &Query::new().where_filter(crate::Filter {
1304                        field: "score".to_string(),
1305                        op,
1306                        value: json!(15),
1307                    }),
1308                )
1309                .expect("numeric filter");
1310            let titles: Vec<&str> = rows
1311                .iter()
1312                .map(|row| {
1313                    row.data
1314                        .get("title")
1315                        .and_then(|value| value.as_str())
1316                        .expect("title")
1317                })
1318                .collect();
1319            assert_eq!(titles, expected_titles);
1320        }
1321
1322        let unknown_field_sort = repo
1323            .list(
1324                "posts",
1325                &Query::new()
1326                    .order_by("missing", SortDirection::Desc)
1327                    .paginate(1, 2),
1328            )
1329            .expect("fallback sort");
1330        assert_eq!(unknown_field_sort.len(), 2);
1331        assert_eq!(unknown_field_sort[0].id, 3);
1332        assert_eq!(unknown_field_sort[1].id, 2);
1333
1334        let score_sort = repo
1335            .list(
1336                "posts",
1337                &Query::new()
1338                    .order_by("score", SortDirection::Desc)
1339                    .order_by("title", SortDirection::Asc),
1340            )
1341            .expect("score sort");
1342        let score_titles: Vec<&str> = score_sort
1343            .iter()
1344            .map(|row| {
1345                row.data
1346                    .get("title")
1347                    .and_then(|value| value.as_str())
1348                    .expect("title")
1349            })
1350            .collect();
1351        assert_eq!(score_titles, vec!["Beta", "Gamma", "Alpha"]);
1352    }
1353
1354    #[test]
1355    fn keyset_pagination_supports_forward_and_backward_windows() {
1356        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1357
1358        for score in 1..=6 {
1359            let mut row = Row::new();
1360            row.insert("title".to_string(), json!(format!("R{score}")));
1361            row.insert("score".to_string(), json!(score));
1362            repo.insert("scores", row).expect("insert score row");
1363        }
1364
1365        let forward_rows = repo
1366            .list(
1367                "scores",
1368                &Query::new()
1369                    .order_by("score", SortDirection::Asc)
1370                    .keyset_after("score", json!(2), 3),
1371            )
1372            .expect("keyset forward");
1373        let forward_scores: Vec<i64> = forward_rows
1374            .iter()
1375            .map(|row| {
1376                row.data
1377                    .get("score")
1378                    .and_then(|value| value.as_i64())
1379                    .expect("score")
1380            })
1381            .collect();
1382        assert_eq!(forward_scores, vec![3, 4, 5]);
1383
1384        let backward_rows = repo
1385            .list(
1386                "scores",
1387                &Query::new()
1388                    .order_by("score", SortDirection::Asc)
1389                    .keyset_before("score", json!(5), 2),
1390            )
1391            .expect("keyset backward");
1392        let backward_scores: Vec<i64> = backward_rows
1393            .iter()
1394            .map(|row| {
1395                row.data
1396                    .get("score")
1397                    .and_then(|value| value.as_i64())
1398                    .expect("score")
1399            })
1400            .collect();
1401        assert_eq!(backward_scores, vec![3, 4]);
1402    }
1403
1404    #[test]
1405    fn list_window_emits_tokens_and_compact_payload() {
1406        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1407
1408        for score in 0..300 {
1409            let mut row = Row::new();
1410            row.insert("score".to_string(), json!(score));
1411            row.insert(
1412                "tenant".to_string(),
1413                json!(if score % 2 == 0 { "a" } else { "b" }),
1414            );
1415            repo.insert("accounts", row).expect("insert account");
1416        }
1417
1418        let query = Query::new()
1419            .order_by("score", SortDirection::Asc)
1420            .window(100, 240, 20)
1421            .wire_format(WireFormatProfile::Compact);
1422        let page = repo.list_window("accounts", &query).expect("list window");
1423
1424        assert_eq!(page.total_rows, 300);
1425        assert_eq!(page.offset, 80);
1426        assert_eq!(page.limit, 180);
1427        assert_eq!(page.rows.len(), 180);
1428        assert_eq!(page.query_fingerprint, query.fingerprint());
1429        assert_eq!(page.token.query_fingerprint, query.fingerprint());
1430        let compact = page.compact_rows.expect("compact payload");
1431        assert!(compact.columns.contains(&"id".to_string()));
1432        assert!(compact.columns.contains(&"score".to_string()));
1433        assert_eq!(compact.rows.len(), page.rows.len());
1434    }
1435
1436    #[test]
1437    fn list_window_rejects_mismatched_query_token() {
1438        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1439        let mut row = Row::new();
1440        row.insert("score".to_string(), json!(1));
1441        repo.insert("accounts", row).expect("insert account");
1442
1443        let base = Query::new()
1444            .where_filter(Filter::eq("score", json!(1)))
1445            .paginate(1, 20);
1446        let stale_token = base.next_window_token(0, 20, 42, 1);
1447        let mismatched = Query::new()
1448            .where_filter(Filter::eq("score", json!(2)))
1449            .paginate(1, 20)
1450            .with_window_token(stale_token);
1451
1452        let err = repo
1453            .list_window("accounts", &mismatched)
1454            .expect_err("window token mismatch should fail");
1455        assert!(matches!(err, DataError::Query(_)));
1456        assert!(err.to_string().contains("window token"));
1457    }
1458
1459    #[test]
1460    fn incremental_diff_tracks_insert_update_and_remove() {
1461        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1462
1463        for (title, score) in [("A", 10), ("B", 20), ("C", 30)] {
1464            let mut row = Row::new();
1465            row.insert("title".to_string(), json!(title));
1466            row.insert("score".to_string(), json!(score));
1467            repo.insert("accounts", row).expect("insert");
1468        }
1469
1470        let query = Query::new()
1471            .order_by("score", SortDirection::Asc)
1472            .paginate(1, 10);
1473        let previous = repo.list_window("accounts", &query).expect("previous page");
1474
1475        let mut updated_row = Row::new();
1476        updated_row.insert("title".to_string(), json!("B"));
1477        updated_row.insert("score".to_string(), json!(25));
1478        repo.update("accounts", 2, updated_row).expect("update row");
1479        repo.delete("accounts", 3).expect("delete row");
1480        let mut inserted_row = Row::new();
1481        inserted_row.insert("title".to_string(), json!("D"));
1482        inserted_row.insert("score".to_string(), json!(35));
1483        repo.insert("accounts", inserted_row).expect("insert row");
1484
1485        let current = repo.list_window("accounts", &query).expect("current page");
1486        let diff = repo.materialize_incremental_diff(&previous, &current);
1487
1488        assert!(!diff.full_resync);
1489        assert_eq!(diff.inserted.len(), 1);
1490        assert_eq!(diff.inserted[0].id, 4);
1491        assert_eq!(diff.updated.len(), 1);
1492        assert_eq!(diff.updated[0].id, 2);
1493        assert_eq!(diff.removed_ids, vec![3]);
1494    }
1495}