1use crate::{
2 adapter::{AdapterKind, DatabaseConfig},
3 error::{DataError, DataResult},
4 query::{
5 Filter, FilterOperator, KeysetDirection, Query, SortDirection, WindowToken,
6 WireFormatProfile,
7 },
8 QueryContext,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::{
13 cmp::Ordering,
14 collections::{BTreeMap, HashMap},
15 sync::Arc,
16 time::{Instant, SystemTime, UNIX_EPOCH},
17};
18use tracing::{info, warn};
19
20pub type Row = BTreeMap<String, Value>;
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub struct StoredRow {
24 pub id: u64,
25 pub data: Row,
26}
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct CompactRowsPayload {
30 pub columns: Vec<String>,
31 pub rows: Vec<Vec<Value>>,
32}
33
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
35pub struct WindowPage {
36 pub offset: usize,
37 pub limit: usize,
38 pub total_rows: usize,
39 pub rows: Vec<StoredRow>,
40 pub query_fingerprint: String,
41 pub token: WindowToken,
42 pub wire_format: WireFormatProfile,
43 pub compact_rows: Option<CompactRowsPayload>,
44}
45
46#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47pub struct IncrementalDiff {
48 pub from_token: Option<WindowToken>,
49 pub to_token: WindowToken,
50 pub full_resync: bool,
51 pub inserted: Vec<StoredRow>,
52 pub updated: Vec<StoredRow>,
53 pub removed_ids: Vec<u64>,
54}
55
56pub trait AdapterDriver: Send + Sync {
57 fn kind(&self) -> AdapterKind;
58}
59
60#[derive(Debug, Default, Clone, Copy)]
61pub struct PostgresAdapter;
62
63impl AdapterDriver for PostgresAdapter {
64 fn kind(&self) -> AdapterKind {
65 AdapterKind::Postgres
66 }
67}
68
69#[derive(Debug, Default, Clone, Copy)]
70pub struct MySqlAdapter;
71
72impl AdapterDriver for MySqlAdapter {
73 fn kind(&self) -> AdapterKind {
74 AdapterKind::MySql
75 }
76}
77
78#[derive(Debug, Default, Clone, Copy)]
79pub struct SqliteAdapter;
80
81impl AdapterDriver for SqliteAdapter {
82 fn kind(&self) -> AdapterKind {
83 AdapterKind::Sqlite
84 }
85}
86
87#[derive(Debug, Default, Clone, Copy)]
88pub struct SingleStoreDriver;
89
90impl AdapterDriver for SingleStoreDriver {
91 fn kind(&self) -> AdapterKind {
92 AdapterKind::SingleStore
93 }
94}
95
96#[derive(Debug, Default, Clone, Copy)]
97pub struct ClickHouseDriver;
98
99impl AdapterDriver for ClickHouseDriver {
100 fn kind(&self) -> AdapterKind {
101 AdapterKind::ClickHouse
102 }
103}
104
105#[derive(Debug, Default, Clone, Copy)]
106pub struct BigQueryDriver;
107
108impl AdapterDriver for BigQueryDriver {
109 fn kind(&self) -> AdapterKind {
110 AdapterKind::BigQuery
111 }
112}
113
114#[derive(Debug, Default, Clone, Copy)]
115pub struct OpenSearchAdapterDriver;
116
117impl AdapterDriver for OpenSearchAdapterDriver {
118 fn kind(&self) -> AdapterKind {
119 AdapterKind::OpenSearch
120 }
121}
122
123pub fn adapter_for(config: &DatabaseConfig) -> DataResult<Box<dyn AdapterDriver>> {
124 match config.adapter {
125 AdapterKind::Postgres => Ok(Box::new(PostgresAdapter)),
126 AdapterKind::MySql => Ok(Box::new(MySqlAdapter)),
127 AdapterKind::Sqlite => Ok(Box::new(SqliteAdapter)),
128 AdapterKind::SingleStore => Ok(Box::new(SingleStoreDriver)),
129 AdapterKind::ClickHouse => Ok(Box::new(ClickHouseDriver)),
130 AdapterKind::BigQuery => Ok(Box::new(BigQueryDriver)),
131 AdapterKind::OpenSearch => Ok(Box::new(OpenSearchAdapterDriver)),
132 AdapterKind::None => Err(DataError::Adapter(
133 "database adapter is `none`; select a supported backend in shelly.data.toml"
134 .to_string(),
135 )),
136 }
137}
138
139pub trait Repo {
140 fn adapter_kind(&self) -> AdapterKind;
141 fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow>;
142 fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow>;
143 fn delete(&mut self, table: &str, id: u64) -> DataResult<()>;
144 fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>>;
145 fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>>;
146 fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage>;
147 fn materialize_incremental_diff(
148 &self,
149 previous: &WindowPage,
150 current: &WindowPage,
151 ) -> IncrementalDiff;
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
155#[serde(rename_all = "snake_case")]
156pub enum TenantRepoOperation {
157 Insert,
158 Update,
159 Delete,
160 Find,
161 List,
162 ListWindow,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct TenantPolicyContext {
167 pub table: String,
168 pub operation: TenantRepoOperation,
169 pub tenant_id: Option<String>,
170 pub row_tenant_id: Option<String>,
171 pub row_id: Option<u64>,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct TenantPolicyDecision {
176 pub allowed: bool,
177 pub code: Option<String>,
178 pub message: Option<String>,
179}
180
181impl TenantPolicyDecision {
182 pub fn allow() -> Self {
183 Self {
184 allowed: true,
185 code: None,
186 message: None,
187 }
188 }
189
190 pub fn deny(code: impl Into<String>, message: impl Into<String>) -> Self {
191 Self {
192 allowed: false,
193 code: Some(code.into()),
194 message: Some(message.into()),
195 }
196 }
197}
198
199pub type TenantPolicyHook = Arc<dyn Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync>;
200
201#[derive(Clone)]
202pub struct TenantRepoConfig {
203 pub tenant_field: String,
204 pub require_tenant_context: bool,
205 pub policy_hook: Option<TenantPolicyHook>,
206}
207
208impl Default for TenantRepoConfig {
209 fn default() -> Self {
210 Self {
211 tenant_field: "tenant_id".to_string(),
212 require_tenant_context: true,
213 policy_hook: None,
214 }
215 }
216}
217
218pub struct TenantScopedRepo<R: Repo> {
219 inner: R,
220 config: TenantRepoConfig,
221}
222
223impl<R: Repo> TenantScopedRepo<R> {
224 pub fn new(inner: R) -> Self {
225 Self {
226 inner,
227 config: TenantRepoConfig::default(),
228 }
229 }
230
231 pub fn with_config(mut self, config: TenantRepoConfig) -> Self {
232 self.config = config;
233 self
234 }
235
236 pub fn with_policy_hook<F>(mut self, policy_hook: F) -> Self
237 where
238 F: Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync + 'static,
239 {
240 self.config.policy_hook = Some(Arc::new(policy_hook));
241 self
242 }
243
244 pub fn inner(&self) -> &R {
245 &self.inner
246 }
247
248 pub fn inner_mut(&mut self) -> &mut R {
249 &mut self.inner
250 }
251
252 pub fn into_inner(self) -> R {
253 self.inner
254 }
255
256 pub fn insert_scoped(
257 &mut self,
258 table: &str,
259 context: &QueryContext,
260 mut data: Row,
261 ) -> DataResult<StoredRow> {
262 let tenant_id = self.required_tenant_id(context)?;
263 let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
264 self.ensure_tenant_match(
265 table,
266 TenantRepoOperation::Insert,
267 tenant_id.as_deref(),
268 row_tenant_id.as_deref(),
269 None,
270 )?;
271 if let Some(tenant_id) = tenant_id.as_deref() {
272 data.insert(
273 self.config.tenant_field.clone(),
274 Value::String(tenant_id.to_string()),
275 );
276 }
277 self.inner.insert(table, data)
278 }
279
280 pub fn update_scoped(
281 &mut self,
282 table: &str,
283 id: u64,
284 context: &QueryContext,
285 mut data: Row,
286 ) -> DataResult<StoredRow> {
287 let tenant_id = self.required_tenant_id(context)?;
288 let existing = self.inner.find(table, id)?;
289 if let Some(existing) = existing.as_ref() {
290 self.ensure_tenant_match(
291 table,
292 TenantRepoOperation::Update,
293 tenant_id.as_deref(),
294 tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
295 Some(id),
296 )?;
297 }
298 let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
299 self.ensure_tenant_match(
300 table,
301 TenantRepoOperation::Update,
302 tenant_id.as_deref(),
303 row_tenant_id.as_deref(),
304 Some(id),
305 )?;
306 if let Some(tenant_id) = tenant_id.as_deref() {
307 data.insert(
308 self.config.tenant_field.clone(),
309 Value::String(tenant_id.to_string()),
310 );
311 }
312 self.inner.update(table, id, data)
313 }
314
315 pub fn delete_scoped(
316 &mut self,
317 table: &str,
318 id: u64,
319 context: &QueryContext,
320 ) -> DataResult<()> {
321 let tenant_id = self.required_tenant_id(context)?;
322 let existing = self.inner.find(table, id)?;
323 if let Some(existing) = existing.as_ref() {
324 self.ensure_tenant_match(
325 table,
326 TenantRepoOperation::Delete,
327 tenant_id.as_deref(),
328 tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
329 Some(id),
330 )?;
331 }
332 self.inner.delete(table, id)
333 }
334
335 pub fn find_scoped(
336 &self,
337 table: &str,
338 id: u64,
339 context: &QueryContext,
340 ) -> DataResult<Option<StoredRow>> {
341 let tenant_id = self.required_tenant_id(context)?;
342 let row = self.inner.find(table, id)?;
343 if let Some(row) = row.as_ref() {
344 self.ensure_tenant_match(
345 table,
346 TenantRepoOperation::Find,
347 tenant_id.as_deref(),
348 tenant_id_from_row(&row.data, self.config.tenant_field.as_str()).as_deref(),
349 Some(id),
350 )?;
351 }
352 Ok(row)
353 }
354
355 pub fn list_scoped(
356 &self,
357 table: &str,
358 query: &Query,
359 context: &QueryContext,
360 ) -> DataResult<Vec<StoredRow>> {
361 let tenant_id = self.required_tenant_id(context)?;
362 self.evaluate_policy(TenantPolicyContext {
363 table: table.to_string(),
364 operation: TenantRepoOperation::List,
365 tenant_id: tenant_id.clone(),
366 row_tenant_id: None,
367 row_id: None,
368 })?;
369 let scoped_query = self.scoped_query(query, tenant_id.as_deref());
370 self.inner.list(table, &scoped_query)
371 }
372
373 pub fn list_window_scoped(
374 &self,
375 table: &str,
376 query: &Query,
377 context: &QueryContext,
378 ) -> DataResult<WindowPage> {
379 let tenant_id = self.required_tenant_id(context)?;
380 self.evaluate_policy(TenantPolicyContext {
381 table: table.to_string(),
382 operation: TenantRepoOperation::ListWindow,
383 tenant_id: tenant_id.clone(),
384 row_tenant_id: None,
385 row_id: None,
386 })?;
387 let scoped_query = self.scoped_query(query, tenant_id.as_deref());
388 self.inner.list_window(table, &scoped_query)
389 }
390
391 fn scoped_query(&self, query: &Query, tenant_id: Option<&str>) -> Query {
392 let mut scoped = query.clone();
393 if let Some(tenant_id) = tenant_id {
394 scoped.filters.push(Filter::eq(
395 self.config.tenant_field.clone(),
396 Value::String(tenant_id.to_string()),
397 ));
398 }
399 scoped
400 }
401
402 fn required_tenant_id(&self, context: &QueryContext) -> DataResult<Option<String>> {
403 let tenant_id = normalize_tenant_id(context.tenant_id.as_deref());
404 if self.config.require_tenant_context && tenant_id.is_none() {
405 return Err(tenant_error(
406 "tenant_context_required",
407 "tenant context is required for scoped data operation",
408 ));
409 }
410 Ok(tenant_id)
411 }
412
413 fn ensure_tenant_match(
414 &self,
415 table: &str,
416 operation: TenantRepoOperation,
417 tenant_id: Option<&str>,
418 row_tenant_id: Option<&str>,
419 row_id: Option<u64>,
420 ) -> DataResult<()> {
421 let normalized_tenant_id = normalize_tenant_id(tenant_id);
422 let normalized_row_tenant_id = normalize_tenant_id(row_tenant_id);
423 if let Some(expected_tenant) = normalized_tenant_id.as_deref() {
424 if let Some(row_tenant) = normalized_row_tenant_id.as_deref() {
425 if row_tenant != expected_tenant {
426 return Err(tenant_error(
427 "tenant_row_mismatch",
428 format!(
429 "row tenant does not match tenant context for table `{table}` (row_id={row_id:?})"
430 ),
431 ));
432 }
433 }
434 }
435 self.evaluate_policy(TenantPolicyContext {
436 table: table.to_string(),
437 operation,
438 tenant_id: normalized_tenant_id,
439 row_tenant_id: normalized_row_tenant_id,
440 row_id,
441 })
442 }
443
444 fn evaluate_policy(&self, context: TenantPolicyContext) -> DataResult<()> {
445 let Some(policy_hook) = self.config.policy_hook.as_ref() else {
446 return Ok(());
447 };
448 let decision = policy_hook(&context);
449 if decision.allowed {
450 Ok(())
451 } else {
452 Err(tenant_error(
453 decision.code.as_deref().unwrap_or("tenant_policy_denied"),
454 decision
455 .message
456 .as_deref()
457 .unwrap_or("tenant policy denied data operation"),
458 ))
459 }
460 }
461}
462
463fn tenant_error(code: &str, message: impl Into<String>) -> DataError {
464 DataError::Query(format!("[{code}] {}", message.into()))
465}
466
467fn normalize_tenant_id(tenant_id: Option<&str>) -> Option<String> {
468 tenant_id
469 .map(str::trim)
470 .filter(|value| !value.is_empty())
471 .map(ToString::to_string)
472}
473
474fn tenant_id_from_row(row: &Row, tenant_field: &str) -> Option<String> {
475 row.get(tenant_field)
476 .and_then(Value::as_str)
477 .and_then(|value| normalize_tenant_id(Some(value)))
478}
479
480pub struct MemoryRepo {
481 driver: Box<dyn AdapterDriver>,
482 tables: BTreeMap<String, Vec<StoredRow>>,
483 next_id: u64,
484}
485
486impl MemoryRepo {
487 pub fn new(driver: Box<dyn AdapterDriver>) -> Self {
488 Self {
489 driver,
490 tables: BTreeMap::new(),
491 next_id: 1,
492 }
493 }
494}
495
496impl Repo for MemoryRepo {
497 fn adapter_kind(&self) -> AdapterKind {
498 self.driver.kind()
499 }
500
501 fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow> {
502 let started_at = Instant::now();
503 let result = {
504 let entry = self.tables.entry(table.to_string()).or_default();
505 let row = StoredRow {
506 id: self.next_id,
507 data,
508 };
509 self.next_id += 1;
510 entry.push(row.clone());
511 Ok(row)
512 };
513
514 match &result {
515 Ok(row) => info!(
516 target: "shelly.data.query",
517 source = "memory_repo",
518 adapter = self.driver.kind().as_str(),
519 operation = "insert",
520 table,
521 row_id = row.id,
522 duration_ms = started_at.elapsed().as_millis() as u64,
523 "Shelly data query executed"
524 ),
525 Err(err) => warn!(
526 target: "shelly.data.query",
527 source = "memory_repo",
528 adapter = self.driver.kind().as_str(),
529 operation = "insert",
530 table,
531 duration_ms = started_at.elapsed().as_millis() as u64,
532 error = %err,
533 "Shelly data query failed"
534 ),
535 }
536
537 result
538 }
539
540 fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow> {
541 let started_at = Instant::now();
542 let result = {
543 let rows = self.tables.entry(table.to_string()).or_default();
544 match rows.iter_mut().find(|row| row.id == id) {
545 Some(existing) => {
546 existing.data = data;
547 Ok(existing.clone())
548 }
549 None => Err(DataError::Query(format!(
550 "row id {id} not found in table `{table}`"
551 ))),
552 }
553 };
554
555 match &result {
556 Ok(row) => info!(
557 target: "shelly.data.query",
558 source = "memory_repo",
559 adapter = self.driver.kind().as_str(),
560 operation = "update",
561 table,
562 row_id = row.id,
563 duration_ms = started_at.elapsed().as_millis() as u64,
564 "Shelly data query executed"
565 ),
566 Err(err) => warn!(
567 target: "shelly.data.query",
568 source = "memory_repo",
569 adapter = self.driver.kind().as_str(),
570 operation = "update",
571 table,
572 row_id = id,
573 duration_ms = started_at.elapsed().as_millis() as u64,
574 error = %err,
575 "Shelly data query failed"
576 ),
577 }
578
579 result
580 }
581
582 fn delete(&mut self, table: &str, id: u64) -> DataResult<()> {
583 let started_at = Instant::now();
584 let result = {
585 let rows = self.tables.entry(table.to_string()).or_default();
586 let initial_len = rows.len();
587 rows.retain(|row| row.id != id);
588 if rows.len() == initial_len {
589 Err(DataError::Query(format!(
590 "row id {id} not found in table `{table}`"
591 )))
592 } else {
593 Ok(())
594 }
595 };
596
597 match &result {
598 Ok(()) => info!(
599 target: "shelly.data.query",
600 source = "memory_repo",
601 adapter = self.driver.kind().as_str(),
602 operation = "delete",
603 table,
604 row_id = id,
605 duration_ms = started_at.elapsed().as_millis() as u64,
606 "Shelly data query executed"
607 ),
608 Err(err) => warn!(
609 target: "shelly.data.query",
610 source = "memory_repo",
611 adapter = self.driver.kind().as_str(),
612 operation = "delete",
613 table,
614 row_id = id,
615 duration_ms = started_at.elapsed().as_millis() as u64,
616 error = %err,
617 "Shelly data query failed"
618 ),
619 }
620
621 result
622 }
623
624 fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>> {
625 let started_at = Instant::now();
626 let result = Ok(self
627 .tables
628 .get(table)
629 .and_then(|rows| rows.iter().find(|row| row.id == id))
630 .cloned());
631
632 match &result {
633 Ok(row) => info!(
634 target: "shelly.data.query",
635 source = "memory_repo",
636 adapter = self.driver.kind().as_str(),
637 operation = "find",
638 table,
639 row_id = id,
640 found = row.is_some(),
641 duration_ms = started_at.elapsed().as_millis() as u64,
642 "Shelly data query executed"
643 ),
644 Err(err) => warn!(
645 target: "shelly.data.query",
646 source = "memory_repo",
647 adapter = self.driver.kind().as_str(),
648 operation = "find",
649 table,
650 row_id = id,
651 duration_ms = started_at.elapsed().as_millis() as u64,
652 error = %err,
653 "Shelly data query failed"
654 ),
655 }
656
657 result
658 }
659
660 fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>> {
661 let started_at = Instant::now();
662 let result = {
663 let rows = self.tables.get(table).cloned().unwrap_or_default();
664 let rows = materialize_rows(rows, query);
665 Ok(rows)
666 };
667
668 match &result {
669 Ok(rows) => info!(
670 target: "shelly.data.query",
671 source = "memory_repo",
672 adapter = self.driver.kind().as_str(),
673 operation = "list",
674 table,
675 row_count = rows.len(),
676 filter_count = query.filters.len(),
677 sort_count = query.sorts.len(),
678 page = query.pagination.map(|value| value.page),
679 per_page = query.pagination.map(|value| value.per_page),
680 keyset_limit = query.keyset.as_ref().map(|value| value.limit),
681 wire_format = ?query.wire_format,
682 duration_ms = started_at.elapsed().as_millis() as u64,
683 "Shelly data query executed"
684 ),
685 Err(err) => warn!(
686 target: "shelly.data.query",
687 source = "memory_repo",
688 adapter = self.driver.kind().as_str(),
689 operation = "list",
690 table,
691 filter_count = query.filters.len(),
692 sort_count = query.sorts.len(),
693 page = query.pagination.map(|value| value.page),
694 per_page = query.pagination.map(|value| value.per_page),
695 keyset_limit = query.keyset.as_ref().map(|value| value.limit),
696 wire_format = ?query.wire_format,
697 duration_ms = started_at.elapsed().as_millis() as u64,
698 error = %err,
699 "Shelly data query failed"
700 ),
701 }
702
703 result
704 }
705
706 fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage> {
707 if !query.has_valid_window_token() {
708 return Err(DataError::Query(
709 "window token query fingerprint mismatch".to_string(),
710 ));
711 }
712
713 let started_at = Instant::now();
714 let all_rows = materialize_rows(self.tables.get(table).cloned().unwrap_or_default(), query);
715 let total_rows = all_rows.len();
716 let (offset, limit) = window_range_from_query(query, total_rows);
717 let rows = all_rows
718 .into_iter()
719 .skip(offset)
720 .take(limit.max(1))
721 .collect::<Vec<_>>();
722 let token = query.next_window_token(offset, limit.max(1), now_epoch_ms(), {
723 (offset as u64)
724 ^ (limit as u64)
725 ^ (rows.len() as u64)
726 ^ (total_rows as u64)
727 ^ self.next_id
728 });
729 let compact_rows =
730 (query.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
731 let page = WindowPage {
732 offset,
733 limit: limit.max(1),
734 total_rows,
735 rows,
736 query_fingerprint: query.fingerprint(),
737 token,
738 wire_format: query.wire_format,
739 compact_rows,
740 };
741
742 info!(
743 target: "shelly.data.query",
744 source = "memory_repo",
745 adapter = self.driver.kind().as_str(),
746 operation = "list_window",
747 table,
748 row_count = page.rows.len(),
749 total_rows = page.total_rows,
750 offset = page.offset,
751 limit = page.limit,
752 wire_format = ?page.wire_format,
753 duration_ms = started_at.elapsed().as_millis() as u64,
754 "Shelly data query executed"
755 );
756
757 Ok(page)
758 }
759
760 fn materialize_incremental_diff(
761 &self,
762 previous: &WindowPage,
763 current: &WindowPage,
764 ) -> IncrementalDiff {
765 let full_resync = previous.query_fingerprint != current.query_fingerprint
766 || previous.wire_format != current.wire_format;
767
768 if full_resync {
769 return IncrementalDiff {
770 from_token: Some(previous.token.clone()),
771 to_token: current.token.clone(),
772 full_resync: true,
773 inserted: current.rows.clone(),
774 updated: Vec::new(),
775 removed_ids: Vec::new(),
776 };
777 }
778
779 let previous_rows: HashMap<u64, &StoredRow> =
780 previous.rows.iter().map(|row| (row.id, row)).collect();
781 let current_rows: HashMap<u64, &StoredRow> =
782 current.rows.iter().map(|row| (row.id, row)).collect();
783
784 let mut inserted = Vec::new();
785 let mut updated = Vec::new();
786 for row in ¤t.rows {
787 match previous_rows.get(&row.id) {
788 None => inserted.push(row.clone()),
789 Some(previous_row) if previous_row.data != row.data => updated.push(row.clone()),
790 _ => {}
791 }
792 }
793
794 let mut removed_ids = previous_rows
795 .keys()
796 .filter(|id| !current_rows.contains_key(id))
797 .copied()
798 .collect::<Vec<_>>();
799 removed_ids.sort_unstable();
800
801 IncrementalDiff {
802 from_token: Some(previous.token.clone()),
803 to_token: current.token.clone(),
804 full_resync: false,
805 inserted,
806 updated,
807 removed_ids,
808 }
809 }
810}
811
812fn materialize_rows(mut rows: Vec<StoredRow>, query: &Query) -> Vec<StoredRow> {
813 if !query.filters.is_empty() {
814 rows.retain(|row| {
815 query
816 .filters
817 .iter()
818 .all(|filter| matches_filter(row, filter))
819 });
820 }
821
822 for sort in query.sorts.iter().rev() {
823 rows.sort_by(|left, right| compare_for_sort(left, right, sort.field.as_str()));
824 if sort.direction == SortDirection::Desc {
825 rows.reverse();
826 }
827 }
828
829 if let Some(keyset) = &query.keyset {
830 rows = apply_keyset(rows, keyset);
831 } else if let Some(pagination) = query.pagination {
832 let offset = (pagination.page.saturating_sub(1)) * pagination.per_page;
833 rows = rows
834 .into_iter()
835 .skip(offset)
836 .take(pagination.per_page)
837 .collect();
838 }
839
840 rows
841}
842
843fn apply_keyset(rows: Vec<StoredRow>, keyset: &crate::query::KeysetPagination) -> Vec<StoredRow> {
844 let limit = keyset.limit.max(1);
845 let Some(cursor) = keyset.cursor.as_ref() else {
846 return rows.into_iter().take(limit).collect();
847 };
848
849 match cursor.direction {
850 KeysetDirection::Forward => rows
851 .into_iter()
852 .filter(|row| compare_row_cursor(row, cursor) == Ordering::Greater)
853 .take(limit)
854 .collect(),
855 KeysetDirection::Backward => {
856 let mut filtered = rows
857 .into_iter()
858 .filter(|row| compare_row_cursor(row, cursor) == Ordering::Less)
859 .collect::<Vec<_>>();
860 let keep_from = filtered.len().saturating_sub(limit);
861 filtered.drain(0..keep_from);
862 filtered
863 }
864 }
865}
866
867fn compare_row_cursor(row: &StoredRow, cursor: &crate::query::KeysetCursor) -> Ordering {
868 row.data
869 .get(&cursor.field)
870 .map(|value| compare_json_values(value, &cursor.value))
871 .unwrap_or_else(|| row.id.cmp(&cursor.value.as_u64().unwrap_or_default()))
872}
873
874fn compare_json_values(left: &Value, right: &Value) -> Ordering {
875 match (left, right) {
876 (Value::Number(_), Value::Number(_)) => {
877 compare_numbers(left, right).unwrap_or(Ordering::Equal)
878 }
879 (Value::String(left), Value::String(right)) => left.cmp(right),
880 (Value::Bool(left), Value::Bool(right)) => left.cmp(right),
881 _ => left.to_string().cmp(&right.to_string()),
882 }
883}
884
885fn window_range_from_query(query: &Query, total_rows: usize) -> (usize, usize) {
886 if let Some(window) = query.window {
887 let span = window.span().max(1);
888 let overscan = window.overscan;
889 let start = window.start.saturating_sub(overscan);
890 let limit = span.saturating_add(overscan.saturating_mul(2)).max(1);
891 if total_rows == 0 {
892 (0, limit)
893 } else {
894 (start.min(total_rows.saturating_sub(1)), limit)
895 }
896 } else if let Some(pagination) = query.pagination {
897 (
898 (pagination.page.saturating_sub(1)) * pagination.per_page,
899 pagination.per_page.max(1),
900 )
901 } else if let Some(keyset) = &query.keyset {
902 (
903 query
904 .window_token
905 .as_ref()
906 .map(|token| token.offset)
907 .unwrap_or(0),
908 keyset.limit.max(1),
909 )
910 } else {
911 (
912 query
913 .window_token
914 .as_ref()
915 .map(|token| token.offset)
916 .unwrap_or(0),
917 query
918 .window_token
919 .as_ref()
920 .map(|token| token.limit)
921 .unwrap_or_else(|| total_rows.max(1)),
922 )
923 }
924}
925
926fn encode_compact_rows(rows: &[StoredRow]) -> CompactRowsPayload {
927 let mut columns = vec!["id".to_string()];
928 for row in rows {
929 for key in row.data.keys() {
930 if !columns.iter().any(|column| column == key) {
931 columns.push(key.clone());
932 }
933 }
934 }
935
936 let mut encoded_rows = Vec::with_capacity(rows.len());
937 for row in rows {
938 let mut encoded = Vec::with_capacity(columns.len());
939 for column in &columns {
940 if column == "id" {
941 encoded.push(Value::from(row.id));
942 } else {
943 encoded.push(row.data.get(column).cloned().unwrap_or(Value::Null));
944 }
945 }
946 encoded_rows.push(encoded);
947 }
948
949 CompactRowsPayload {
950 columns,
951 rows: encoded_rows,
952 }
953}
954
955fn now_epoch_ms() -> u64 {
956 SystemTime::now()
957 .duration_since(UNIX_EPOCH)
958 .map(|duration| duration.as_millis() as u64)
959 .unwrap_or_default()
960}
961
962fn matches_filter(row: &StoredRow, filter: &crate::query::Filter) -> bool {
963 let Some(candidate) = row.data.get(&filter.field) else {
964 return false;
965 };
966 match filter.op {
967 FilterOperator::Eq => candidate == &filter.value,
968 FilterOperator::Neq => candidate != &filter.value,
969 FilterOperator::Contains => candidate
970 .as_str()
971 .zip(filter.value.as_str())
972 .is_some_and(|(left, right)| left.contains(right)),
973 FilterOperator::Gt => {
974 compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Greater)
975 }
976 FilterOperator::Gte => compare_numbers(candidate, &filter.value)
977 .is_some_and(|ord| ord == Ordering::Greater || ord == Ordering::Equal),
978 FilterOperator::Lt => {
979 compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Less)
980 }
981 FilterOperator::Lte => compare_numbers(candidate, &filter.value)
982 .is_some_and(|ord| ord == Ordering::Less || ord == Ordering::Equal),
983 }
984}
985
986fn compare_for_sort(left: &StoredRow, right: &StoredRow, field: &str) -> Ordering {
987 let left_value = left.data.get(field);
988 let right_value = right.data.get(field);
989 match (left_value, right_value) {
990 (Some(Value::Number(left_num)), Some(Value::Number(right_num))) => left_num
991 .as_f64()
992 .partial_cmp(&right_num.as_f64())
993 .unwrap_or(Ordering::Equal),
994 (Some(Value::String(left_text)), Some(Value::String(right_text))) => {
995 left_text.cmp(right_text)
996 }
997 _ => left.id.cmp(&right.id),
998 }
999}
1000
1001fn compare_numbers(left: &Value, right: &Value) -> Option<Ordering> {
1002 left.as_f64()
1003 .zip(right.as_f64())
1004 .and_then(|(left, right)| left.partial_cmp(&right))
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009 use super::{
1010 adapter_for, DatabaseConfig, MemoryRepo, Repo, Row, TenantPolicyDecision,
1011 TenantRepoOperation, TenantScopedRepo,
1012 };
1013 use crate::{
1014 AdapterKind, DataError, Filter, FilterOperator, Query, QueryContext, SortDirection,
1015 WireFormatProfile,
1016 };
1017 use serde_json::json;
1018
1019 #[test]
1020 fn memory_repo_works_for_adapter_selection() {
1021 let mut repo = MemoryRepo::new(
1022 adapter_for(&DatabaseConfig {
1023 adapter: AdapterKind::Sqlite,
1024 url: None,
1025 url_env: None,
1026 })
1027 .unwrap(),
1028 );
1029 let mut row = Row::new();
1030 row.insert("title".to_string(), json!("Alpha"));
1031 row.insert("score".to_string(), json!(10));
1032 repo.insert("posts", row).unwrap();
1033
1034 let rows = repo
1035 .list(
1036 "posts",
1037 &Query::new()
1038 .where_filter(Filter::contains("title", "Al"))
1039 .order_by("score", SortDirection::Desc),
1040 )
1041 .unwrap();
1042 assert_eq!(rows.len(), 1);
1043 assert_eq!(rows[0].data.get("title"), Some(&json!("Alpha")));
1044 }
1045
1046 #[test]
1047 fn adapter_for_rejects_none_and_selects_expected_driver() {
1048 let none_result = adapter_for(&DatabaseConfig {
1049 adapter: AdapterKind::None,
1050 url: None,
1051 url_env: None,
1052 });
1053 assert!(matches!(none_result, Err(DataError::Adapter(_))));
1054
1055 for kind in [
1056 AdapterKind::Postgres,
1057 AdapterKind::MySql,
1058 AdapterKind::Sqlite,
1059 AdapterKind::SingleStore,
1060 AdapterKind::ClickHouse,
1061 AdapterKind::BigQuery,
1062 AdapterKind::OpenSearch,
1063 ] {
1064 let driver = adapter_for(&DatabaseConfig {
1065 adapter: kind,
1066 url: None,
1067 url_env: None,
1068 })
1069 .expect("driver should be created");
1070 assert_eq!(driver.kind(), kind);
1071 }
1072 }
1073
1074 #[test]
1075 fn update_delete_and_find_cover_missing_rows() {
1076 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1077
1078 let mut row = Row::new();
1079 row.insert("title".to_string(), json!("Draft"));
1080 let inserted = repo.insert("posts", row).expect("insert should work");
1081
1082 assert_eq!(
1083 repo.find("posts", inserted.id)
1084 .expect("find should not fail")
1085 .map(|it| it.id),
1086 Some(inserted.id)
1087 );
1088 assert!(repo
1089 .find("posts", 999)
1090 .expect("find should not fail")
1091 .is_none());
1092 assert!(repo
1093 .find("missing_table", inserted.id)
1094 .expect("find should not fail")
1095 .is_none());
1096
1097 let mut updated = Row::new();
1098 updated.insert("title".to_string(), json!("Published"));
1099 let updated_row = repo
1100 .update("posts", inserted.id, updated)
1101 .expect("update should work");
1102 assert_eq!(updated_row.data.get("title"), Some(&json!("Published")));
1103
1104 let update_err = repo
1105 .update("posts", 404, Row::new())
1106 .expect_err("missing row should fail update");
1107 assert!(matches!(update_err, DataError::Query(_)));
1108
1109 repo.delete("posts", inserted.id)
1110 .expect("delete should remove row");
1111 let delete_err = repo
1112 .delete("posts", inserted.id)
1113 .expect_err("deleting missing row should fail");
1114 assert!(matches!(delete_err, DataError::Query(_)));
1115 }
1116
1117 #[test]
1118 fn tenant_scoped_repo_enforces_row_level_isolation() {
1119 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1120 let mut scoped = TenantScopedRepo::new(repo);
1121
1122 let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1123 let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1124
1125 let mut first = Row::new();
1126 first.insert("name".to_string(), json!("Acme"));
1127 let inserted = scoped
1128 .insert_scoped("accounts", &tenant_a, first)
1129 .expect("insert tenant-a row");
1130
1131 let found = scoped
1132 .find_scoped("accounts", inserted.id, &tenant_a)
1133 .expect("find tenant-a row");
1134 assert!(found.is_some());
1135
1136 let denied = scoped
1137 .find_scoped("accounts", inserted.id, &tenant_b)
1138 .expect_err("cross-tenant find should be denied");
1139 assert!(denied.to_string().contains("tenant_row_mismatch"));
1140 }
1141
1142 #[test]
1143 fn tenant_scoped_repo_requires_tenant_context_by_default() {
1144 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1145 let mut scoped = TenantScopedRepo::new(repo);
1146
1147 let mut row = Row::new();
1148 row.insert("name".to_string(), json!("NoTenant"));
1149 let err = scoped
1150 .insert_scoped("accounts", &QueryContext::default(), row)
1151 .expect_err("missing tenant context should be denied");
1152 assert!(err.to_string().contains("tenant_context_required"));
1153 }
1154
1155 #[test]
1156 fn tenant_scoped_repo_policy_hook_can_reject_operations() {
1157 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1158 let mut scoped = TenantScopedRepo::new(repo).with_policy_hook(|ctx| {
1159 if ctx.operation == TenantRepoOperation::Delete {
1160 TenantPolicyDecision::deny("tenant_delete_denied", "tenant delete denied")
1161 } else {
1162 TenantPolicyDecision::allow()
1163 }
1164 });
1165
1166 let tenant = QueryContext::default().with_tenant_id("tenant-a");
1167 let mut row = Row::new();
1168 row.insert("name".to_string(), json!("Acme"));
1169 let inserted = scoped
1170 .insert_scoped("accounts", &tenant, row)
1171 .expect("insert tenant row");
1172
1173 let err = scoped
1174 .delete_scoped("accounts", inserted.id, &tenant)
1175 .expect_err("policy should reject delete");
1176 assert!(err.to_string().contains("tenant_delete_denied"));
1177 }
1178
1179 #[test]
1180 fn tenant_scoped_repo_update_and_window_list_enforce_scope() {
1181 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1182 let mut scoped = TenantScopedRepo::new(repo);
1183
1184 let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1185 let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1186
1187 let mut first = Row::new();
1188 first.insert("name".to_string(), json!("Acme"));
1189 first.insert("score".to_string(), json!(10));
1190 let inserted = scoped
1191 .insert_scoped("accounts", &tenant_a, first)
1192 .expect("insert tenant-a row");
1193
1194 let mut update = Row::new();
1195 update.insert("name".to_string(), json!("Acme Prime"));
1196 update.insert("score".to_string(), json!(20));
1197 let updated = scoped
1198 .update_scoped("accounts", inserted.id, &tenant_a, update)
1199 .expect("update tenant-a row");
1200 assert_eq!(updated.data.get("tenant_id"), Some(&json!("tenant-a")));
1201
1202 let mut bad_update = Row::new();
1203 bad_update.insert("name".to_string(), json!("Cross"));
1204 bad_update.insert("tenant_id".to_string(), json!("tenant-b"));
1205 let err = scoped
1206 .update_scoped("accounts", inserted.id, &tenant_a, bad_update)
1207 .expect_err("tenant mismatch on row payload should fail");
1208 assert!(err.to_string().contains("tenant_row_mismatch"));
1209
1210 let list = scoped
1211 .list_scoped(
1212 "accounts",
1213 &Query::new().order_by("score", SortDirection::Asc),
1214 &tenant_a,
1215 )
1216 .expect("list scoped");
1217 assert_eq!(list.len(), 1);
1218
1219 let window = scoped
1220 .list_window_scoped(
1221 "accounts",
1222 &Query::new().order_by("score", SortDirection::Asc).window(1, 5, 2),
1223 &tenant_a,
1224 )
1225 .expect("list window scoped");
1226 assert_eq!(window.rows.len(), 1);
1227
1228 let denied = scoped
1229 .find_scoped("accounts", inserted.id, &tenant_b)
1230 .expect_err("cross-tenant find should still fail");
1231 assert!(denied.to_string().contains("tenant_row_mismatch"));
1232 }
1233
1234 #[test]
1235 fn list_applies_filters_sorts_and_pagination() {
1236 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1237
1238 let mut alpha = Row::new();
1239 alpha.insert("title".to_string(), json!("Alpha"));
1240 alpha.insert("score".to_string(), json!(10));
1241 alpha.insert("tag".to_string(), json!("core"));
1242 repo.insert("posts", alpha).expect("insert alpha");
1243
1244 let mut beta = Row::new();
1245 beta.insert("title".to_string(), json!("Beta"));
1246 beta.insert("score".to_string(), json!(20));
1247 beta.insert("tag".to_string(), json!("ops"));
1248 repo.insert("posts", beta).expect("insert beta");
1249
1250 let mut gamma = Row::new();
1251 gamma.insert("title".to_string(), json!("Gamma"));
1252 gamma.insert("score".to_string(), json!(15));
1253 gamma.insert("tag".to_string(), json!(123));
1254 repo.insert("posts", gamma).expect("insert gamma");
1255
1256 let eq_rows = repo
1257 .list(
1258 "posts",
1259 &Query::new().where_filter(Filter::eq("title", json!("Alpha"))),
1260 )
1261 .expect("eq filter");
1262 assert_eq!(eq_rows.len(), 1);
1263 assert_eq!(eq_rows[0].data.get("title"), Some(&json!("Alpha")));
1264
1265 let neq_rows = repo
1266 .list(
1267 "posts",
1268 &Query::new().where_filter(crate::Filter {
1269 field: "title".to_string(),
1270 op: FilterOperator::Neq,
1271 value: json!("Alpha"),
1272 }),
1273 )
1274 .expect("neq filter");
1275 assert_eq!(neq_rows.len(), 2);
1276
1277 let contains_rows = repo
1278 .list(
1279 "posts",
1280 &Query::new().where_filter(Filter::contains("title", "mm")),
1281 )
1282 .expect("contains filter");
1283 assert_eq!(contains_rows.len(), 1);
1284 assert_eq!(contains_rows[0].data.get("title"), Some(&json!("Gamma")));
1285
1286 let contains_non_string_rows = repo
1287 .list(
1288 "posts",
1289 &Query::new().where_filter(Filter::contains("tag", "2")),
1290 )
1291 .expect("contains on mixed type");
1292 assert!(contains_non_string_rows.is_empty());
1293
1294 for (op, expected_titles) in [
1295 (FilterOperator::Gt, vec!["Beta"]),
1296 (FilterOperator::Gte, vec!["Beta", "Gamma"]),
1297 (FilterOperator::Lt, vec!["Alpha"]),
1298 (FilterOperator::Lte, vec!["Alpha", "Gamma"]),
1299 ] {
1300 let rows = repo
1301 .list(
1302 "posts",
1303 &Query::new().where_filter(crate::Filter {
1304 field: "score".to_string(),
1305 op,
1306 value: json!(15),
1307 }),
1308 )
1309 .expect("numeric filter");
1310 let titles: Vec<&str> = rows
1311 .iter()
1312 .map(|row| {
1313 row.data
1314 .get("title")
1315 .and_then(|value| value.as_str())
1316 .expect("title")
1317 })
1318 .collect();
1319 assert_eq!(titles, expected_titles);
1320 }
1321
1322 let unknown_field_sort = repo
1323 .list(
1324 "posts",
1325 &Query::new()
1326 .order_by("missing", SortDirection::Desc)
1327 .paginate(1, 2),
1328 )
1329 .expect("fallback sort");
1330 assert_eq!(unknown_field_sort.len(), 2);
1331 assert_eq!(unknown_field_sort[0].id, 3);
1332 assert_eq!(unknown_field_sort[1].id, 2);
1333
1334 let score_sort = repo
1335 .list(
1336 "posts",
1337 &Query::new()
1338 .order_by("score", SortDirection::Desc)
1339 .order_by("title", SortDirection::Asc),
1340 )
1341 .expect("score sort");
1342 let score_titles: Vec<&str> = score_sort
1343 .iter()
1344 .map(|row| {
1345 row.data
1346 .get("title")
1347 .and_then(|value| value.as_str())
1348 .expect("title")
1349 })
1350 .collect();
1351 assert_eq!(score_titles, vec!["Beta", "Gamma", "Alpha"]);
1352 }
1353
1354 #[test]
1355 fn keyset_pagination_supports_forward_and_backward_windows() {
1356 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1357
1358 for score in 1..=6 {
1359 let mut row = Row::new();
1360 row.insert("title".to_string(), json!(format!("R{score}")));
1361 row.insert("score".to_string(), json!(score));
1362 repo.insert("scores", row).expect("insert score row");
1363 }
1364
1365 let forward_rows = repo
1366 .list(
1367 "scores",
1368 &Query::new()
1369 .order_by("score", SortDirection::Asc)
1370 .keyset_after("score", json!(2), 3),
1371 )
1372 .expect("keyset forward");
1373 let forward_scores: Vec<i64> = forward_rows
1374 .iter()
1375 .map(|row| {
1376 row.data
1377 .get("score")
1378 .and_then(|value| value.as_i64())
1379 .expect("score")
1380 })
1381 .collect();
1382 assert_eq!(forward_scores, vec![3, 4, 5]);
1383
1384 let backward_rows = repo
1385 .list(
1386 "scores",
1387 &Query::new()
1388 .order_by("score", SortDirection::Asc)
1389 .keyset_before("score", json!(5), 2),
1390 )
1391 .expect("keyset backward");
1392 let backward_scores: Vec<i64> = backward_rows
1393 .iter()
1394 .map(|row| {
1395 row.data
1396 .get("score")
1397 .and_then(|value| value.as_i64())
1398 .expect("score")
1399 })
1400 .collect();
1401 assert_eq!(backward_scores, vec![3, 4]);
1402 }
1403
1404 #[test]
1405 fn list_window_emits_tokens_and_compact_payload() {
1406 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1407
1408 for score in 0..300 {
1409 let mut row = Row::new();
1410 row.insert("score".to_string(), json!(score));
1411 row.insert(
1412 "tenant".to_string(),
1413 json!(if score % 2 == 0 { "a" } else { "b" }),
1414 );
1415 repo.insert("accounts", row).expect("insert account");
1416 }
1417
1418 let query = Query::new()
1419 .order_by("score", SortDirection::Asc)
1420 .window(100, 240, 20)
1421 .wire_format(WireFormatProfile::Compact);
1422 let page = repo.list_window("accounts", &query).expect("list window");
1423
1424 assert_eq!(page.total_rows, 300);
1425 assert_eq!(page.offset, 80);
1426 assert_eq!(page.limit, 180);
1427 assert_eq!(page.rows.len(), 180);
1428 assert_eq!(page.query_fingerprint, query.fingerprint());
1429 assert_eq!(page.token.query_fingerprint, query.fingerprint());
1430 let compact = page.compact_rows.expect("compact payload");
1431 assert!(compact.columns.contains(&"id".to_string()));
1432 assert!(compact.columns.contains(&"score".to_string()));
1433 assert_eq!(compact.rows.len(), page.rows.len());
1434 }
1435
1436 #[test]
1437 fn list_window_rejects_mismatched_query_token() {
1438 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1439 let mut row = Row::new();
1440 row.insert("score".to_string(), json!(1));
1441 repo.insert("accounts", row).expect("insert account");
1442
1443 let base = Query::new()
1444 .where_filter(Filter::eq("score", json!(1)))
1445 .paginate(1, 20);
1446 let stale_token = base.next_window_token(0, 20, 42, 1);
1447 let mismatched = Query::new()
1448 .where_filter(Filter::eq("score", json!(2)))
1449 .paginate(1, 20)
1450 .with_window_token(stale_token);
1451
1452 let err = repo
1453 .list_window("accounts", &mismatched)
1454 .expect_err("window token mismatch should fail");
1455 assert!(matches!(err, DataError::Query(_)));
1456 assert!(err.to_string().contains("window token"));
1457 }
1458
1459 #[test]
1460 fn incremental_diff_tracks_insert_update_and_remove() {
1461 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1462
1463 for (title, score) in [("A", 10), ("B", 20), ("C", 30)] {
1464 let mut row = Row::new();
1465 row.insert("title".to_string(), json!(title));
1466 row.insert("score".to_string(), json!(score));
1467 repo.insert("accounts", row).expect("insert");
1468 }
1469
1470 let query = Query::new()
1471 .order_by("score", SortDirection::Asc)
1472 .paginate(1, 10);
1473 let previous = repo.list_window("accounts", &query).expect("previous page");
1474
1475 let mut updated_row = Row::new();
1476 updated_row.insert("title".to_string(), json!("B"));
1477 updated_row.insert("score".to_string(), json!(25));
1478 repo.update("accounts", 2, updated_row).expect("update row");
1479 repo.delete("accounts", 3).expect("delete row");
1480 let mut inserted_row = Row::new();
1481 inserted_row.insert("title".to_string(), json!("D"));
1482 inserted_row.insert("score".to_string(), json!(35));
1483 repo.insert("accounts", inserted_row).expect("insert row");
1484
1485 let current = repo.list_window("accounts", &query).expect("current page");
1486 let diff = repo.materialize_incremental_diff(&previous, ¤t);
1487
1488 assert!(!diff.full_resync);
1489 assert_eq!(diff.inserted.len(), 1);
1490 assert_eq!(diff.inserted[0].id, 4);
1491 assert_eq!(diff.updated.len(), 1);
1492 assert_eq!(diff.updated[0].id, 2);
1493 assert_eq!(diff.removed_ids, vec![3]);
1494 }
1495}