1use crate::{
2 adapter::{AdapterKind, DatabaseConfig},
3 error::{DataError, DataResult},
4 query::{
5 Filter, FilterOperator, KeysetDirection, Query, SortDirection, WindowToken,
6 WireFormatProfile,
7 },
8 QueryContext,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::{
13 cmp::Ordering,
14 collections::{BTreeMap, HashMap},
15 sync::Arc,
16 time::{Instant, SystemTime, UNIX_EPOCH},
17};
18use tracing::{info, warn};
19
20pub type Row = BTreeMap<String, Value>;
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub struct StoredRow {
24 pub id: u64,
25 pub data: Row,
26}
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct CompactRowsPayload {
30 pub columns: Vec<String>,
31 pub rows: Vec<Vec<Value>>,
32}
33
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
35pub struct WindowPage {
36 pub offset: usize,
37 pub limit: usize,
38 pub total_rows: usize,
39 pub rows: Vec<StoredRow>,
40 pub query_fingerprint: String,
41 pub token: WindowToken,
42 pub wire_format: WireFormatProfile,
43 pub compact_rows: Option<CompactRowsPayload>,
44}
45
46#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47pub struct IncrementalDiff {
48 pub from_token: Option<WindowToken>,
49 pub to_token: WindowToken,
50 pub full_resync: bool,
51 pub inserted: Vec<StoredRow>,
52 pub updated: Vec<StoredRow>,
53 pub removed_ids: Vec<u64>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct OptimisticLock {
58 pub field: String,
59 pub expected_version: u64,
60 pub increment_by: u64,
61}
62
63impl OptimisticLock {
64 pub fn new(field: impl Into<String>, expected_version: u64) -> Self {
65 Self {
66 field: field.into(),
67 expected_version,
68 increment_by: 1,
69 }
70 }
71
72 pub fn increment_by(mut self, increment_by: u64) -> Self {
73 self.increment_by = increment_by.max(1);
74 self
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79pub struct QueryObservabilityPolicy {
80 pub slow_query_threshold_ms: u64,
81 pub n_plus_one_window_size: usize,
82 pub n_plus_one_repeat_threshold: usize,
83}
84
85impl Default for QueryObservabilityPolicy {
86 fn default() -> Self {
87 Self {
88 slow_query_threshold_ms: 250,
89 n_plus_one_window_size: 12,
90 n_plus_one_repeat_threshold: 3,
91 }
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct QueryObservation {
97 pub table: String,
98 pub operation: String,
99 pub query_fingerprint: String,
100 pub duration_ms: u64,
101 pub slow_query: bool,
102 pub potential_n_plus_one: bool,
103 pub repeated_fingerprint_count: usize,
104 pub preloads: Vec<String>,
105 pub trace_id: Option<String>,
106 pub correlation_id: Option<String>,
107 pub request_id: Option<String>,
108 pub hints: Vec<String>,
109}
110
111#[derive(Debug, Default, Clone)]
112pub struct QueryObservabilityTracker {
113 policy: QueryObservabilityPolicy,
114 recent_fingerprints: HashMap<String, Vec<String>>,
115}
116
117impl QueryObservabilityTracker {
118 pub fn new(policy: QueryObservabilityPolicy) -> Self {
119 Self {
120 policy,
121 recent_fingerprints: HashMap::new(),
122 }
123 }
124
125 pub fn policy(&self) -> &QueryObservabilityPolicy {
126 &self.policy
127 }
128
129 pub fn observe(
130 &mut self,
131 table: &str,
132 operation: &str,
133 query: &Query,
134 context: &QueryContext,
135 duration_ms: u64,
136 ) -> QueryObservation {
137 let fingerprint = query.fingerprint();
138 let scope_key = observation_scope_key(table, operation, context);
139 let history = self.recent_fingerprints.entry(scope_key).or_default();
140 history.push(fingerprint.clone());
141 if history.len() > self.policy.n_plus_one_window_size {
142 let drain_count = history.len() - self.policy.n_plus_one_window_size;
143 history.drain(0..drain_count);
144 }
145
146 let repeated_fingerprint_count = history
147 .iter()
148 .filter(|value| *value == &fingerprint)
149 .count();
150 let potential_n_plus_one = query.preloads.is_empty()
151 && repeated_fingerprint_count >= self.policy.n_plus_one_repeat_threshold;
152 let slow_query = duration_ms >= self.policy.slow_query_threshold_ms;
153
154 let mut hints = Vec::new();
155 if slow_query {
156 hints.push(format!(
157 "slow query ({}ms >= {}ms); consider indexes and narrower filters",
158 duration_ms, self.policy.slow_query_threshold_ms
159 ));
160 }
161 if potential_n_plus_one {
162 hints.push(format!(
163 "repeated query fingerprint observed {} times without preloads; possible N+1",
164 repeated_fingerprint_count
165 ));
166 }
167
168 QueryObservation {
169 table: table.to_string(),
170 operation: operation.to_string(),
171 query_fingerprint: fingerprint,
172 duration_ms,
173 slow_query,
174 potential_n_plus_one,
175 repeated_fingerprint_count,
176 preloads: query.preloads.clone(),
177 trace_id: context.trace_id.clone(),
178 correlation_id: context.correlation_id().map(ToString::to_string),
179 request_id: context.request_id().map(ToString::to_string),
180 hints,
181 }
182 }
183}
184
185pub trait AdapterDriver: Send + Sync {
186 fn kind(&self) -> AdapterKind;
187}
188
189#[derive(Debug, Default, Clone, Copy)]
190pub struct PostgresAdapter;
191
192impl AdapterDriver for PostgresAdapter {
193 fn kind(&self) -> AdapterKind {
194 AdapterKind::Postgres
195 }
196}
197
198#[derive(Debug, Default, Clone, Copy)]
199pub struct MySqlAdapter;
200
201impl AdapterDriver for MySqlAdapter {
202 fn kind(&self) -> AdapterKind {
203 AdapterKind::MySql
204 }
205}
206
207#[derive(Debug, Default, Clone, Copy)]
208pub struct SqliteAdapter;
209
210impl AdapterDriver for SqliteAdapter {
211 fn kind(&self) -> AdapterKind {
212 AdapterKind::Sqlite
213 }
214}
215
216#[derive(Debug, Default, Clone, Copy)]
217pub struct SingleStoreDriver;
218
219impl AdapterDriver for SingleStoreDriver {
220 fn kind(&self) -> AdapterKind {
221 AdapterKind::SingleStore
222 }
223}
224
225#[derive(Debug, Default, Clone, Copy)]
226pub struct ClickHouseDriver;
227
228impl AdapterDriver for ClickHouseDriver {
229 fn kind(&self) -> AdapterKind {
230 AdapterKind::ClickHouse
231 }
232}
233
234#[derive(Debug, Default, Clone, Copy)]
235pub struct BigQueryDriver;
236
237impl AdapterDriver for BigQueryDriver {
238 fn kind(&self) -> AdapterKind {
239 AdapterKind::BigQuery
240 }
241}
242
243#[derive(Debug, Default, Clone, Copy)]
244pub struct OpenSearchAdapterDriver;
245
246impl AdapterDriver for OpenSearchAdapterDriver {
247 fn kind(&self) -> AdapterKind {
248 AdapterKind::OpenSearch
249 }
250}
251
252pub fn adapter_for(config: &DatabaseConfig) -> DataResult<Box<dyn AdapterDriver>> {
253 match config.adapter {
254 AdapterKind::Postgres => Ok(Box::new(PostgresAdapter)),
255 AdapterKind::MySql => Ok(Box::new(MySqlAdapter)),
256 AdapterKind::Sqlite => Ok(Box::new(SqliteAdapter)),
257 AdapterKind::SingleStore => Ok(Box::new(SingleStoreDriver)),
258 AdapterKind::ClickHouse => Ok(Box::new(ClickHouseDriver)),
259 AdapterKind::BigQuery => Ok(Box::new(BigQueryDriver)),
260 AdapterKind::OpenSearch => Ok(Box::new(OpenSearchAdapterDriver)),
261 AdapterKind::None => Err(DataError::Adapter(
262 "database adapter is `none`; select a supported backend in shelly.data.toml"
263 .to_string(),
264 )),
265 }
266}
267
268pub trait Repo {
269 fn adapter_kind(&self) -> AdapterKind;
270 fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow>;
271 fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow>;
272 fn delete(&mut self, table: &str, id: u64) -> DataResult<()>;
273 fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>>;
274 fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>>;
275 fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage>;
276 fn materialize_incremental_diff(
277 &self,
278 previous: &WindowPage,
279 current: &WindowPage,
280 ) -> IncrementalDiff;
281}
282
283pub trait RepoUnitOfWork: Repo {
284 fn transaction<T, F>(&mut self, operation: F) -> DataResult<T>
285 where
286 F: FnOnce(&mut Self) -> DataResult<T>;
287}
288
289pub trait OptimisticLockingRepo: Repo {
290 fn update_with_optimistic_lock(
291 &mut self,
292 table: &str,
293 id: u64,
294 patch: Row,
295 lock: OptimisticLock,
296 ) -> DataResult<StoredRow>;
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
300#[serde(rename_all = "snake_case")]
301pub enum TenantRepoOperation {
302 Insert,
303 Update,
304 Delete,
305 Find,
306 List,
307 ListWindow,
308}
309
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct TenantPolicyContext {
312 pub table: String,
313 pub operation: TenantRepoOperation,
314 pub tenant_id: Option<String>,
315 pub row_tenant_id: Option<String>,
316 pub row_id: Option<u64>,
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct TenantPolicyDecision {
321 pub allowed: bool,
322 pub code: Option<String>,
323 pub message: Option<String>,
324}
325
326impl TenantPolicyDecision {
327 pub fn allow() -> Self {
328 Self {
329 allowed: true,
330 code: None,
331 message: None,
332 }
333 }
334
335 pub fn deny(code: impl Into<String>, message: impl Into<String>) -> Self {
336 Self {
337 allowed: false,
338 code: Some(code.into()),
339 message: Some(message.into()),
340 }
341 }
342}
343
344pub type TenantPolicyHook = Arc<dyn Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync>;
345
346#[derive(Clone)]
347pub struct TenantRepoConfig {
348 pub tenant_field: String,
349 pub require_tenant_context: bool,
350 pub policy_hook: Option<TenantPolicyHook>,
351}
352
353impl Default for TenantRepoConfig {
354 fn default() -> Self {
355 Self {
356 tenant_field: "tenant_id".to_string(),
357 require_tenant_context: true,
358 policy_hook: None,
359 }
360 }
361}
362
363pub struct TenantScopedRepo<R: Repo> {
364 inner: R,
365 config: TenantRepoConfig,
366}
367
368impl<R: Repo> TenantScopedRepo<R> {
369 pub fn new(inner: R) -> Self {
370 Self {
371 inner,
372 config: TenantRepoConfig::default(),
373 }
374 }
375
376 pub fn with_config(mut self, config: TenantRepoConfig) -> Self {
377 self.config = config;
378 self
379 }
380
381 pub fn with_policy_hook<F>(mut self, policy_hook: F) -> Self
382 where
383 F: Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync + 'static,
384 {
385 self.config.policy_hook = Some(Arc::new(policy_hook));
386 self
387 }
388
389 pub fn inner(&self) -> &R {
390 &self.inner
391 }
392
393 pub fn inner_mut(&mut self) -> &mut R {
394 &mut self.inner
395 }
396
397 pub fn into_inner(self) -> R {
398 self.inner
399 }
400
401 pub fn insert_scoped(
402 &mut self,
403 table: &str,
404 context: &QueryContext,
405 mut data: Row,
406 ) -> DataResult<StoredRow> {
407 let tenant_id = self.required_tenant_id(context)?;
408 let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
409 self.ensure_tenant_match(
410 table,
411 TenantRepoOperation::Insert,
412 tenant_id.as_deref(),
413 row_tenant_id.as_deref(),
414 None,
415 )?;
416 if let Some(tenant_id) = tenant_id.as_deref() {
417 data.insert(
418 self.config.tenant_field.clone(),
419 Value::String(tenant_id.to_string()),
420 );
421 }
422 self.inner.insert(table, data)
423 }
424
425 pub fn update_scoped(
426 &mut self,
427 table: &str,
428 id: u64,
429 context: &QueryContext,
430 mut data: Row,
431 ) -> DataResult<StoredRow> {
432 let tenant_id = self.required_tenant_id(context)?;
433 let existing = self.inner.find(table, id)?;
434 if let Some(existing) = existing.as_ref() {
435 self.ensure_tenant_match(
436 table,
437 TenantRepoOperation::Update,
438 tenant_id.as_deref(),
439 tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
440 Some(id),
441 )?;
442 }
443 let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
444 self.ensure_tenant_match(
445 table,
446 TenantRepoOperation::Update,
447 tenant_id.as_deref(),
448 row_tenant_id.as_deref(),
449 Some(id),
450 )?;
451 if let Some(tenant_id) = tenant_id.as_deref() {
452 data.insert(
453 self.config.tenant_field.clone(),
454 Value::String(tenant_id.to_string()),
455 );
456 }
457 self.inner.update(table, id, data)
458 }
459
460 pub fn delete_scoped(
461 &mut self,
462 table: &str,
463 id: u64,
464 context: &QueryContext,
465 ) -> DataResult<()> {
466 let tenant_id = self.required_tenant_id(context)?;
467 let existing = self.inner.find(table, id)?;
468 if let Some(existing) = existing.as_ref() {
469 self.ensure_tenant_match(
470 table,
471 TenantRepoOperation::Delete,
472 tenant_id.as_deref(),
473 tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
474 Some(id),
475 )?;
476 }
477 self.inner.delete(table, id)
478 }
479
480 pub fn find_scoped(
481 &self,
482 table: &str,
483 id: u64,
484 context: &QueryContext,
485 ) -> DataResult<Option<StoredRow>> {
486 let tenant_id = self.required_tenant_id(context)?;
487 let row = self.inner.find(table, id)?;
488 if let Some(row) = row.as_ref() {
489 self.ensure_tenant_match(
490 table,
491 TenantRepoOperation::Find,
492 tenant_id.as_deref(),
493 tenant_id_from_row(&row.data, self.config.tenant_field.as_str()).as_deref(),
494 Some(id),
495 )?;
496 }
497 Ok(row)
498 }
499
500 pub fn list_scoped(
501 &self,
502 table: &str,
503 query: &Query,
504 context: &QueryContext,
505 ) -> DataResult<Vec<StoredRow>> {
506 let tenant_id = self.required_tenant_id(context)?;
507 self.evaluate_policy(TenantPolicyContext {
508 table: table.to_string(),
509 operation: TenantRepoOperation::List,
510 tenant_id: tenant_id.clone(),
511 row_tenant_id: None,
512 row_id: None,
513 })?;
514 let scoped_query = self.scoped_query(query, tenant_id.as_deref());
515 self.inner.list(table, &scoped_query)
516 }
517
518 pub fn list_window_scoped(
519 &self,
520 table: &str,
521 query: &Query,
522 context: &QueryContext,
523 ) -> DataResult<WindowPage> {
524 let tenant_id = self.required_tenant_id(context)?;
525 self.evaluate_policy(TenantPolicyContext {
526 table: table.to_string(),
527 operation: TenantRepoOperation::ListWindow,
528 tenant_id: tenant_id.clone(),
529 row_tenant_id: None,
530 row_id: None,
531 })?;
532 let scoped_query = self.scoped_query(query, tenant_id.as_deref());
533 self.inner.list_window(table, &scoped_query)
534 }
535
536 fn scoped_query(&self, query: &Query, tenant_id: Option<&str>) -> Query {
537 let mut scoped = query.clone();
538 if let Some(tenant_id) = tenant_id {
539 scoped.filters.push(Filter::eq(
540 self.config.tenant_field.clone(),
541 Value::String(tenant_id.to_string()),
542 ));
543 }
544 scoped
545 }
546
547 fn required_tenant_id(&self, context: &QueryContext) -> DataResult<Option<String>> {
548 let tenant_id = normalize_tenant_id(context.tenant_id.as_deref());
549 if self.config.require_tenant_context && tenant_id.is_none() {
550 return Err(tenant_error(
551 "tenant_context_required",
552 "tenant context is required for scoped data operation",
553 ));
554 }
555 Ok(tenant_id)
556 }
557
558 fn ensure_tenant_match(
559 &self,
560 table: &str,
561 operation: TenantRepoOperation,
562 tenant_id: Option<&str>,
563 row_tenant_id: Option<&str>,
564 row_id: Option<u64>,
565 ) -> DataResult<()> {
566 let normalized_tenant_id = normalize_tenant_id(tenant_id);
567 let normalized_row_tenant_id = normalize_tenant_id(row_tenant_id);
568 if let Some(expected_tenant) = normalized_tenant_id.as_deref() {
569 if let Some(row_tenant) = normalized_row_tenant_id.as_deref() {
570 if row_tenant != expected_tenant {
571 return Err(tenant_error(
572 "tenant_row_mismatch",
573 format!(
574 "row tenant does not match tenant context for table `{table}` (row_id={row_id:?})"
575 ),
576 ));
577 }
578 }
579 }
580 self.evaluate_policy(TenantPolicyContext {
581 table: table.to_string(),
582 operation,
583 tenant_id: normalized_tenant_id,
584 row_tenant_id: normalized_row_tenant_id,
585 row_id,
586 })
587 }
588
589 fn evaluate_policy(&self, context: TenantPolicyContext) -> DataResult<()> {
590 let Some(policy_hook) = self.config.policy_hook.as_ref() else {
591 return Ok(());
592 };
593 let decision = policy_hook(&context);
594 if decision.allowed {
595 Ok(())
596 } else {
597 Err(tenant_error(
598 decision.code.as_deref().unwrap_or("tenant_policy_denied"),
599 decision
600 .message
601 .as_deref()
602 .unwrap_or("tenant policy denied data operation"),
603 ))
604 }
605 }
606}
607
608fn tenant_error(code: &str, message: impl Into<String>) -> DataError {
609 DataError::Query(format!("[{code}] {}", message.into()))
610}
611
612fn observation_scope_key(table: &str, operation: &str, context: &QueryContext) -> String {
613 let correlation_scope = context
614 .correlation_id()
615 .or_else(|| context.request_id())
616 .or(context.trace_id.as_deref())
617 .unwrap_or("global");
618 format!("{table}:{operation}:{correlation_scope}")
619}
620
621fn normalize_tenant_id(tenant_id: Option<&str>) -> Option<String> {
622 tenant_id
623 .map(str::trim)
624 .filter(|value| !value.is_empty())
625 .map(ToString::to_string)
626}
627
628fn tenant_id_from_row(row: &Row, tenant_field: &str) -> Option<String> {
629 row.get(tenant_field)
630 .and_then(Value::as_str)
631 .and_then(|value| normalize_tenant_id(Some(value)))
632}
633
634pub struct MemoryRepo {
635 driver: Box<dyn AdapterDriver>,
636 tables: BTreeMap<String, Vec<StoredRow>>,
637 next_id: u64,
638}
639
640impl MemoryRepo {
641 pub fn new(driver: Box<dyn AdapterDriver>) -> Self {
642 Self {
643 driver,
644 tables: BTreeMap::new(),
645 next_id: 1,
646 }
647 }
648
649 pub fn list_observed(
650 &self,
651 table: &str,
652 query: &Query,
653 context: &QueryContext,
654 tracker: &mut QueryObservabilityTracker,
655 ) -> DataResult<(Vec<StoredRow>, QueryObservation)> {
656 let started_at = Instant::now();
657 let rows = self.list(table, query)?;
658 let observation = tracker.observe(
659 table,
660 "list",
661 query,
662 context,
663 started_at.elapsed().as_millis() as u64,
664 );
665 Ok((rows, observation))
666 }
667
668 pub fn list_window_observed(
669 &self,
670 table: &str,
671 query: &Query,
672 context: &QueryContext,
673 tracker: &mut QueryObservabilityTracker,
674 ) -> DataResult<(WindowPage, QueryObservation)> {
675 let started_at = Instant::now();
676 let page = self.list_window(table, query)?;
677 let observation = tracker.observe(
678 table,
679 "list_window",
680 query,
681 context,
682 started_at.elapsed().as_millis() as u64,
683 );
684 Ok((page, observation))
685 }
686}
687
688impl Repo for MemoryRepo {
689 fn adapter_kind(&self) -> AdapterKind {
690 self.driver.kind()
691 }
692
693 fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow> {
694 let started_at = Instant::now();
695 let result = {
696 let entry = self.tables.entry(table.to_string()).or_default();
697 let row = StoredRow {
698 id: self.next_id,
699 data,
700 };
701 self.next_id += 1;
702 entry.push(row.clone());
703 Ok(row)
704 };
705
706 match &result {
707 Ok(row) => info!(
708 target: "shelly.data.query",
709 source = "memory_repo",
710 adapter = self.driver.kind().as_str(),
711 operation = "insert",
712 table,
713 row_id = row.id,
714 duration_ms = started_at.elapsed().as_millis() as u64,
715 "Shelly data query executed"
716 ),
717 Err(err) => warn!(
718 target: "shelly.data.query",
719 source = "memory_repo",
720 adapter = self.driver.kind().as_str(),
721 operation = "insert",
722 table,
723 duration_ms = started_at.elapsed().as_millis() as u64,
724 error = %err,
725 "Shelly data query failed"
726 ),
727 }
728
729 result
730 }
731
732 fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow> {
733 let started_at = Instant::now();
734 let result = {
735 let rows = self.tables.entry(table.to_string()).or_default();
736 match rows.iter_mut().find(|row| row.id == id) {
737 Some(existing) => {
738 existing.data = data;
739 Ok(existing.clone())
740 }
741 None => Err(DataError::Query(format!(
742 "row id {id} not found in table `{table}`"
743 ))),
744 }
745 };
746
747 match &result {
748 Ok(row) => info!(
749 target: "shelly.data.query",
750 source = "memory_repo",
751 adapter = self.driver.kind().as_str(),
752 operation = "update",
753 table,
754 row_id = row.id,
755 duration_ms = started_at.elapsed().as_millis() as u64,
756 "Shelly data query executed"
757 ),
758 Err(err) => warn!(
759 target: "shelly.data.query",
760 source = "memory_repo",
761 adapter = self.driver.kind().as_str(),
762 operation = "update",
763 table,
764 row_id = id,
765 duration_ms = started_at.elapsed().as_millis() as u64,
766 error = %err,
767 "Shelly data query failed"
768 ),
769 }
770
771 result
772 }
773
774 fn delete(&mut self, table: &str, id: u64) -> DataResult<()> {
775 let started_at = Instant::now();
776 let result = {
777 let rows = self.tables.entry(table.to_string()).or_default();
778 let initial_len = rows.len();
779 rows.retain(|row| row.id != id);
780 if rows.len() == initial_len {
781 Err(DataError::Query(format!(
782 "row id {id} not found in table `{table}`"
783 )))
784 } else {
785 Ok(())
786 }
787 };
788
789 match &result {
790 Ok(()) => info!(
791 target: "shelly.data.query",
792 source = "memory_repo",
793 adapter = self.driver.kind().as_str(),
794 operation = "delete",
795 table,
796 row_id = id,
797 duration_ms = started_at.elapsed().as_millis() as u64,
798 "Shelly data query executed"
799 ),
800 Err(err) => warn!(
801 target: "shelly.data.query",
802 source = "memory_repo",
803 adapter = self.driver.kind().as_str(),
804 operation = "delete",
805 table,
806 row_id = id,
807 duration_ms = started_at.elapsed().as_millis() as u64,
808 error = %err,
809 "Shelly data query failed"
810 ),
811 }
812
813 result
814 }
815
816 fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>> {
817 let started_at = Instant::now();
818 let result = Ok(self
819 .tables
820 .get(table)
821 .and_then(|rows| rows.iter().find(|row| row.id == id))
822 .cloned());
823
824 match &result {
825 Ok(row) => info!(
826 target: "shelly.data.query",
827 source = "memory_repo",
828 adapter = self.driver.kind().as_str(),
829 operation = "find",
830 table,
831 row_id = id,
832 found = row.is_some(),
833 duration_ms = started_at.elapsed().as_millis() as u64,
834 "Shelly data query executed"
835 ),
836 Err(err) => warn!(
837 target: "shelly.data.query",
838 source = "memory_repo",
839 adapter = self.driver.kind().as_str(),
840 operation = "find",
841 table,
842 row_id = id,
843 duration_ms = started_at.elapsed().as_millis() as u64,
844 error = %err,
845 "Shelly data query failed"
846 ),
847 }
848
849 result
850 }
851
852 fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>> {
853 let started_at = Instant::now();
854 let result = {
855 let rows = self.tables.get(table).cloned().unwrap_or_default();
856 let rows = materialize_rows(rows, query);
857 Ok(rows)
858 };
859
860 match &result {
861 Ok(rows) => info!(
862 target: "shelly.data.query",
863 source = "memory_repo",
864 adapter = self.driver.kind().as_str(),
865 operation = "list",
866 table,
867 row_count = rows.len(),
868 filter_count = query.filters.len(),
869 sort_count = query.sorts.len(),
870 page = query.pagination.map(|value| value.page),
871 per_page = query.pagination.map(|value| value.per_page),
872 keyset_limit = query.keyset.as_ref().map(|value| value.limit),
873 wire_format = ?query.wire_format,
874 duration_ms = started_at.elapsed().as_millis() as u64,
875 "Shelly data query executed"
876 ),
877 Err(err) => warn!(
878 target: "shelly.data.query",
879 source = "memory_repo",
880 adapter = self.driver.kind().as_str(),
881 operation = "list",
882 table,
883 filter_count = query.filters.len(),
884 sort_count = query.sorts.len(),
885 page = query.pagination.map(|value| value.page),
886 per_page = query.pagination.map(|value| value.per_page),
887 keyset_limit = query.keyset.as_ref().map(|value| value.limit),
888 wire_format = ?query.wire_format,
889 duration_ms = started_at.elapsed().as_millis() as u64,
890 error = %err,
891 "Shelly data query failed"
892 ),
893 }
894
895 result
896 }
897
898 fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage> {
899 if !query.has_valid_window_token() {
900 return Err(DataError::Query(
901 "window token query fingerprint mismatch".to_string(),
902 ));
903 }
904
905 let started_at = Instant::now();
906 let all_rows = materialize_rows(self.tables.get(table).cloned().unwrap_or_default(), query);
907 let total_rows = all_rows.len();
908 let (offset, limit) = window_range_from_query(query, total_rows);
909 let rows = all_rows
910 .into_iter()
911 .skip(offset)
912 .take(limit.max(1))
913 .collect::<Vec<_>>();
914 let token = query.next_window_token(offset, limit.max(1), now_epoch_ms(), {
915 (offset as u64)
916 ^ (limit as u64)
917 ^ (rows.len() as u64)
918 ^ (total_rows as u64)
919 ^ self.next_id
920 });
921 let compact_rows =
922 (query.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
923 let page = WindowPage {
924 offset,
925 limit: limit.max(1),
926 total_rows,
927 rows,
928 query_fingerprint: query.fingerprint(),
929 token,
930 wire_format: query.wire_format,
931 compact_rows,
932 };
933
934 info!(
935 target: "shelly.data.query",
936 source = "memory_repo",
937 adapter = self.driver.kind().as_str(),
938 operation = "list_window",
939 table,
940 row_count = page.rows.len(),
941 total_rows = page.total_rows,
942 offset = page.offset,
943 limit = page.limit,
944 wire_format = ?page.wire_format,
945 duration_ms = started_at.elapsed().as_millis() as u64,
946 "Shelly data query executed"
947 );
948
949 Ok(page)
950 }
951
952 fn materialize_incremental_diff(
953 &self,
954 previous: &WindowPage,
955 current: &WindowPage,
956 ) -> IncrementalDiff {
957 let full_resync = previous.query_fingerprint != current.query_fingerprint
958 || previous.wire_format != current.wire_format;
959
960 if full_resync {
961 return IncrementalDiff {
962 from_token: Some(previous.token.clone()),
963 to_token: current.token.clone(),
964 full_resync: true,
965 inserted: current.rows.clone(),
966 updated: Vec::new(),
967 removed_ids: Vec::new(),
968 };
969 }
970
971 let previous_rows: HashMap<u64, &StoredRow> =
972 previous.rows.iter().map(|row| (row.id, row)).collect();
973 let current_rows: HashMap<u64, &StoredRow> =
974 current.rows.iter().map(|row| (row.id, row)).collect();
975
976 let mut inserted = Vec::new();
977 let mut updated = Vec::new();
978 for row in ¤t.rows {
979 match previous_rows.get(&row.id) {
980 None => inserted.push(row.clone()),
981 Some(previous_row) if previous_row.data != row.data => updated.push(row.clone()),
982 _ => {}
983 }
984 }
985
986 let mut removed_ids = previous_rows
987 .keys()
988 .filter(|id| !current_rows.contains_key(id))
989 .copied()
990 .collect::<Vec<_>>();
991 removed_ids.sort_unstable();
992
993 IncrementalDiff {
994 from_token: Some(previous.token.clone()),
995 to_token: current.token.clone(),
996 full_resync: false,
997 inserted,
998 updated,
999 removed_ids,
1000 }
1001 }
1002}
1003
1004impl RepoUnitOfWork for MemoryRepo {
1005 fn transaction<T, F>(&mut self, operation: F) -> DataResult<T>
1006 where
1007 F: FnOnce(&mut Self) -> DataResult<T>,
1008 {
1009 let snapshot_tables = self.tables.clone();
1010 let snapshot_next_id = self.next_id;
1011 match operation(self) {
1012 Ok(value) => Ok(value),
1013 Err(err) => {
1014 self.tables = snapshot_tables;
1015 self.next_id = snapshot_next_id;
1016 Err(err)
1017 }
1018 }
1019 }
1020}
1021
1022impl OptimisticLockingRepo for MemoryRepo {
1023 fn update_with_optimistic_lock(
1024 &mut self,
1025 table: &str,
1026 id: u64,
1027 patch: Row,
1028 lock: OptimisticLock,
1029 ) -> DataResult<StoredRow> {
1030 let existing = self.find(table, id)?.ok_or_else(|| {
1031 DataError::Query(format!(
1032 "row id {id} not found in table `{table}` for optimistic lock update"
1033 ))
1034 })?;
1035 let current_version = existing
1036 .data
1037 .get(lock.field.as_str())
1038 .and_then(Value::as_u64)
1039 .ok_or_else(|| {
1040 DataError::Query(format!(
1041 "row id {id} in `{table}` missing optimistic lock field `{}`",
1042 lock.field
1043 ))
1044 })?;
1045 if current_version != lock.expected_version {
1046 return Err(DataError::Query(format!(
1047 "optimistic lock conflict on `{table}` row id {id}: expected {} but found {}",
1048 lock.expected_version, current_version
1049 )));
1050 }
1051 let next_version = current_version
1052 .checked_add(lock.increment_by)
1053 .ok_or_else(|| {
1054 DataError::Query(format!(
1055 "optimistic lock version overflow on `{table}` row id {id}"
1056 ))
1057 })?;
1058
1059 let mut merged = existing.data;
1060 for (key, value) in patch {
1061 merged.insert(key, value);
1062 }
1063 merged.insert(lock.field, Value::from(next_version));
1064 self.update(table, id, merged)
1065 }
1066}
1067
1068fn materialize_rows(mut rows: Vec<StoredRow>, query: &Query) -> Vec<StoredRow> {
1069 if !query.filters.is_empty() {
1070 rows.retain(|row| {
1071 query
1072 .filters
1073 .iter()
1074 .all(|filter| matches_filter(row, filter))
1075 });
1076 }
1077
1078 for sort in query.sorts.iter().rev() {
1079 rows.sort_by(|left, right| compare_for_sort(left, right, sort.field.as_str()));
1080 if sort.direction == SortDirection::Desc {
1081 rows.reverse();
1082 }
1083 }
1084
1085 if let Some(keyset) = &query.keyset {
1086 rows = apply_keyset(rows, keyset);
1087 } else if let Some(pagination) = query.pagination {
1088 let offset = (pagination.page.saturating_sub(1)) * pagination.per_page;
1089 rows = rows
1090 .into_iter()
1091 .skip(offset)
1092 .take(pagination.per_page)
1093 .collect();
1094 }
1095
1096 rows
1097}
1098
1099fn apply_keyset(rows: Vec<StoredRow>, keyset: &crate::query::KeysetPagination) -> Vec<StoredRow> {
1100 let limit = keyset.limit.max(1);
1101 let Some(cursor) = keyset.cursor.as_ref() else {
1102 return rows.into_iter().take(limit).collect();
1103 };
1104
1105 match cursor.direction {
1106 KeysetDirection::Forward => rows
1107 .into_iter()
1108 .filter(|row| compare_row_cursor(row, cursor) == Ordering::Greater)
1109 .take(limit)
1110 .collect(),
1111 KeysetDirection::Backward => {
1112 let mut filtered = rows
1113 .into_iter()
1114 .filter(|row| compare_row_cursor(row, cursor) == Ordering::Less)
1115 .collect::<Vec<_>>();
1116 let keep_from = filtered.len().saturating_sub(limit);
1117 filtered.drain(0..keep_from);
1118 filtered
1119 }
1120 }
1121}
1122
1123fn compare_row_cursor(row: &StoredRow, cursor: &crate::query::KeysetCursor) -> Ordering {
1124 row.data
1125 .get(&cursor.field)
1126 .map(|value| compare_json_values(value, &cursor.value))
1127 .unwrap_or_else(|| row.id.cmp(&cursor.value.as_u64().unwrap_or_default()))
1128}
1129
1130fn compare_json_values(left: &Value, right: &Value) -> Ordering {
1131 match (left, right) {
1132 (Value::Number(_), Value::Number(_)) => {
1133 compare_numbers(left, right).unwrap_or(Ordering::Equal)
1134 }
1135 (Value::String(left), Value::String(right)) => left.cmp(right),
1136 (Value::Bool(left), Value::Bool(right)) => left.cmp(right),
1137 _ => left.to_string().cmp(&right.to_string()),
1138 }
1139}
1140
1141fn window_range_from_query(query: &Query, total_rows: usize) -> (usize, usize) {
1142 if let Some(window) = query.window {
1143 let span = window.span().max(1);
1144 let overscan = window.overscan;
1145 let start = window.start.saturating_sub(overscan);
1146 let limit = span.saturating_add(overscan.saturating_mul(2)).max(1);
1147 if total_rows == 0 {
1148 (0, limit)
1149 } else {
1150 (start.min(total_rows.saturating_sub(1)), limit)
1151 }
1152 } else if let Some(pagination) = query.pagination {
1153 (
1154 (pagination.page.saturating_sub(1)) * pagination.per_page,
1155 pagination.per_page.max(1),
1156 )
1157 } else if let Some(keyset) = &query.keyset {
1158 (
1159 query
1160 .window_token
1161 .as_ref()
1162 .map(|token| token.offset)
1163 .unwrap_or(0),
1164 keyset.limit.max(1),
1165 )
1166 } else {
1167 (
1168 query
1169 .window_token
1170 .as_ref()
1171 .map(|token| token.offset)
1172 .unwrap_or(0),
1173 query
1174 .window_token
1175 .as_ref()
1176 .map(|token| token.limit)
1177 .unwrap_or_else(|| total_rows.max(1)),
1178 )
1179 }
1180}
1181
1182fn encode_compact_rows(rows: &[StoredRow]) -> CompactRowsPayload {
1183 let mut columns = vec!["id".to_string()];
1184 for row in rows {
1185 for key in row.data.keys() {
1186 if !columns.iter().any(|column| column == key) {
1187 columns.push(key.clone());
1188 }
1189 }
1190 }
1191
1192 let mut encoded_rows = Vec::with_capacity(rows.len());
1193 for row in rows {
1194 let mut encoded = Vec::with_capacity(columns.len());
1195 for column in &columns {
1196 if column == "id" {
1197 encoded.push(Value::from(row.id));
1198 } else {
1199 encoded.push(row.data.get(column).cloned().unwrap_or(Value::Null));
1200 }
1201 }
1202 encoded_rows.push(encoded);
1203 }
1204
1205 CompactRowsPayload {
1206 columns,
1207 rows: encoded_rows,
1208 }
1209}
1210
1211fn now_epoch_ms() -> u64 {
1212 SystemTime::now()
1213 .duration_since(UNIX_EPOCH)
1214 .map(|duration| duration.as_millis() as u64)
1215 .unwrap_or_default()
1216}
1217
1218fn matches_filter(row: &StoredRow, filter: &crate::query::Filter) -> bool {
1219 let Some(candidate) = row.data.get(&filter.field) else {
1220 return false;
1221 };
1222 match filter.op {
1223 FilterOperator::Eq => candidate == &filter.value,
1224 FilterOperator::Neq => candidate != &filter.value,
1225 FilterOperator::Contains => candidate
1226 .as_str()
1227 .zip(filter.value.as_str())
1228 .is_some_and(|(left, right)| left.contains(right)),
1229 FilterOperator::Gt => {
1230 compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Greater)
1231 }
1232 FilterOperator::Gte => compare_numbers(candidate, &filter.value)
1233 .is_some_and(|ord| ord == Ordering::Greater || ord == Ordering::Equal),
1234 FilterOperator::Lt => {
1235 compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Less)
1236 }
1237 FilterOperator::Lte => compare_numbers(candidate, &filter.value)
1238 .is_some_and(|ord| ord == Ordering::Less || ord == Ordering::Equal),
1239 }
1240}
1241
1242fn compare_for_sort(left: &StoredRow, right: &StoredRow, field: &str) -> Ordering {
1243 let left_value = left.data.get(field);
1244 let right_value = right.data.get(field);
1245 match (left_value, right_value) {
1246 (Some(Value::Number(left_num)), Some(Value::Number(right_num))) => left_num
1247 .as_f64()
1248 .partial_cmp(&right_num.as_f64())
1249 .unwrap_or(Ordering::Equal),
1250 (Some(Value::String(left_text)), Some(Value::String(right_text))) => {
1251 left_text.cmp(right_text)
1252 }
1253 _ => left.id.cmp(&right.id),
1254 }
1255}
1256
1257fn compare_numbers(left: &Value, right: &Value) -> Option<Ordering> {
1258 left.as_f64()
1259 .zip(right.as_f64())
1260 .and_then(|(left, right)| left.partial_cmp(&right))
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use super::{
1266 adapter_for, DatabaseConfig, MemoryRepo, OptimisticLock, OptimisticLockingRepo,
1267 QueryObservabilityPolicy, QueryObservabilityTracker, Repo, RepoUnitOfWork, Row,
1268 TenantPolicyDecision, TenantRepoOperation, TenantScopedRepo,
1269 };
1270 use crate::{
1271 AdapterKind, DataError, Filter, FilterOperator, Query, QueryContext, SortDirection,
1272 WireFormatProfile,
1273 };
1274 use serde_json::json;
1275
1276 #[test]
1277 fn memory_repo_works_for_adapter_selection() {
1278 let mut repo = MemoryRepo::new(
1279 adapter_for(&DatabaseConfig {
1280 adapter: AdapterKind::Sqlite,
1281 url: None,
1282 url_env: None,
1283 })
1284 .unwrap(),
1285 );
1286 let mut row = Row::new();
1287 row.insert("title".to_string(), json!("Alpha"));
1288 row.insert("score".to_string(), json!(10));
1289 repo.insert("posts", row).unwrap();
1290
1291 let rows = repo
1292 .list(
1293 "posts",
1294 &Query::new()
1295 .where_filter(Filter::contains("title", "Al"))
1296 .order_by("score", SortDirection::Desc),
1297 )
1298 .unwrap();
1299 assert_eq!(rows.len(), 1);
1300 assert_eq!(rows[0].data.get("title"), Some(&json!("Alpha")));
1301 }
1302
1303 #[test]
1304 fn adapter_for_rejects_none_and_selects_expected_driver() {
1305 let none_result = adapter_for(&DatabaseConfig {
1306 adapter: AdapterKind::None,
1307 url: None,
1308 url_env: None,
1309 });
1310 assert!(matches!(none_result, Err(DataError::Adapter(_))));
1311
1312 for kind in [
1313 AdapterKind::Postgres,
1314 AdapterKind::MySql,
1315 AdapterKind::Sqlite,
1316 AdapterKind::SingleStore,
1317 AdapterKind::ClickHouse,
1318 AdapterKind::BigQuery,
1319 AdapterKind::OpenSearch,
1320 ] {
1321 let driver = adapter_for(&DatabaseConfig {
1322 adapter: kind,
1323 url: None,
1324 url_env: None,
1325 })
1326 .expect("driver should be created");
1327 assert_eq!(driver.kind(), kind);
1328 }
1329 }
1330
1331 #[test]
1332 fn update_delete_and_find_cover_missing_rows() {
1333 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1334
1335 let mut row = Row::new();
1336 row.insert("title".to_string(), json!("Draft"));
1337 let inserted = repo.insert("posts", row).expect("insert should work");
1338
1339 assert_eq!(
1340 repo.find("posts", inserted.id)
1341 .expect("find should not fail")
1342 .map(|it| it.id),
1343 Some(inserted.id)
1344 );
1345 assert!(repo
1346 .find("posts", 999)
1347 .expect("find should not fail")
1348 .is_none());
1349 assert!(repo
1350 .find("missing_table", inserted.id)
1351 .expect("find should not fail")
1352 .is_none());
1353
1354 let mut updated = Row::new();
1355 updated.insert("title".to_string(), json!("Published"));
1356 let updated_row = repo
1357 .update("posts", inserted.id, updated)
1358 .expect("update should work");
1359 assert_eq!(updated_row.data.get("title"), Some(&json!("Published")));
1360
1361 let update_err = repo
1362 .update("posts", 404, Row::new())
1363 .expect_err("missing row should fail update");
1364 assert!(matches!(update_err, DataError::Query(_)));
1365
1366 repo.delete("posts", inserted.id)
1367 .expect("delete should remove row");
1368 let delete_err = repo
1369 .delete("posts", inserted.id)
1370 .expect_err("deleting missing row should fail");
1371 assert!(matches!(delete_err, DataError::Query(_)));
1372 }
1373
1374 #[test]
1375 fn tenant_scoped_repo_enforces_row_level_isolation() {
1376 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1377 let mut scoped = TenantScopedRepo::new(repo);
1378
1379 let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1380 let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1381
1382 let mut first = Row::new();
1383 first.insert("name".to_string(), json!("Acme"));
1384 let inserted = scoped
1385 .insert_scoped("accounts", &tenant_a, first)
1386 .expect("insert tenant-a row");
1387
1388 let found = scoped
1389 .find_scoped("accounts", inserted.id, &tenant_a)
1390 .expect("find tenant-a row");
1391 assert!(found.is_some());
1392
1393 let denied = scoped
1394 .find_scoped("accounts", inserted.id, &tenant_b)
1395 .expect_err("cross-tenant find should be denied");
1396 assert!(denied.to_string().contains("tenant_row_mismatch"));
1397 }
1398
1399 #[test]
1400 fn tenant_scoped_repo_requires_tenant_context_by_default() {
1401 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1402 let mut scoped = TenantScopedRepo::new(repo);
1403
1404 let mut row = Row::new();
1405 row.insert("name".to_string(), json!("NoTenant"));
1406 let err = scoped
1407 .insert_scoped("accounts", &QueryContext::default(), row)
1408 .expect_err("missing tenant context should be denied");
1409 assert!(err.to_string().contains("tenant_context_required"));
1410 }
1411
1412 #[test]
1413 fn tenant_scoped_repo_policy_hook_can_reject_operations() {
1414 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1415 let mut scoped = TenantScopedRepo::new(repo).with_policy_hook(|ctx| {
1416 if ctx.operation == TenantRepoOperation::Delete {
1417 TenantPolicyDecision::deny("tenant_delete_denied", "tenant delete denied")
1418 } else {
1419 TenantPolicyDecision::allow()
1420 }
1421 });
1422
1423 let tenant = QueryContext::default().with_tenant_id("tenant-a");
1424 let mut row = Row::new();
1425 row.insert("name".to_string(), json!("Acme"));
1426 let inserted = scoped
1427 .insert_scoped("accounts", &tenant, row)
1428 .expect("insert tenant row");
1429
1430 let err = scoped
1431 .delete_scoped("accounts", inserted.id, &tenant)
1432 .expect_err("policy should reject delete");
1433 assert!(err.to_string().contains("tenant_delete_denied"));
1434 }
1435
1436 #[test]
1437 fn tenant_scoped_repo_update_and_window_list_enforce_scope() {
1438 let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1439 let mut scoped = TenantScopedRepo::new(repo);
1440
1441 let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1442 let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1443
1444 let mut first = Row::new();
1445 first.insert("name".to_string(), json!("Acme"));
1446 first.insert("score".to_string(), json!(10));
1447 let inserted = scoped
1448 .insert_scoped("accounts", &tenant_a, first)
1449 .expect("insert tenant-a row");
1450
1451 let mut update = Row::new();
1452 update.insert("name".to_string(), json!("Acme Prime"));
1453 update.insert("score".to_string(), json!(20));
1454 let updated = scoped
1455 .update_scoped("accounts", inserted.id, &tenant_a, update)
1456 .expect("update tenant-a row");
1457 assert_eq!(updated.data.get("tenant_id"), Some(&json!("tenant-a")));
1458
1459 let mut bad_update = Row::new();
1460 bad_update.insert("name".to_string(), json!("Cross"));
1461 bad_update.insert("tenant_id".to_string(), json!("tenant-b"));
1462 let err = scoped
1463 .update_scoped("accounts", inserted.id, &tenant_a, bad_update)
1464 .expect_err("tenant mismatch on row payload should fail");
1465 assert!(err.to_string().contains("tenant_row_mismatch"));
1466
1467 let list = scoped
1468 .list_scoped(
1469 "accounts",
1470 &Query::new().order_by("score", SortDirection::Asc),
1471 &tenant_a,
1472 )
1473 .expect("list scoped");
1474 assert_eq!(list.len(), 1);
1475
1476 let window = scoped
1477 .list_window_scoped(
1478 "accounts",
1479 &Query::new()
1480 .order_by("score", SortDirection::Asc)
1481 .window(1, 5, 2),
1482 &tenant_a,
1483 )
1484 .expect("list window scoped");
1485 assert_eq!(window.rows.len(), 1);
1486
1487 let denied = scoped
1488 .find_scoped("accounts", inserted.id, &tenant_b)
1489 .expect_err("cross-tenant find should still fail");
1490 assert!(denied.to_string().contains("tenant_row_mismatch"));
1491 }
1492
1493 #[test]
1494 fn list_applies_filters_sorts_and_pagination() {
1495 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1496
1497 let mut alpha = Row::new();
1498 alpha.insert("title".to_string(), json!("Alpha"));
1499 alpha.insert("score".to_string(), json!(10));
1500 alpha.insert("tag".to_string(), json!("core"));
1501 repo.insert("posts", alpha).expect("insert alpha");
1502
1503 let mut beta = Row::new();
1504 beta.insert("title".to_string(), json!("Beta"));
1505 beta.insert("score".to_string(), json!(20));
1506 beta.insert("tag".to_string(), json!("ops"));
1507 repo.insert("posts", beta).expect("insert beta");
1508
1509 let mut gamma = Row::new();
1510 gamma.insert("title".to_string(), json!("Gamma"));
1511 gamma.insert("score".to_string(), json!(15));
1512 gamma.insert("tag".to_string(), json!(123));
1513 repo.insert("posts", gamma).expect("insert gamma");
1514
1515 let eq_rows = repo
1516 .list(
1517 "posts",
1518 &Query::new().where_filter(Filter::eq("title", json!("Alpha"))),
1519 )
1520 .expect("eq filter");
1521 assert_eq!(eq_rows.len(), 1);
1522 assert_eq!(eq_rows[0].data.get("title"), Some(&json!("Alpha")));
1523
1524 let neq_rows = repo
1525 .list(
1526 "posts",
1527 &Query::new().where_filter(crate::Filter {
1528 field: "title".to_string(),
1529 op: FilterOperator::Neq,
1530 value: json!("Alpha"),
1531 }),
1532 )
1533 .expect("neq filter");
1534 assert_eq!(neq_rows.len(), 2);
1535
1536 let contains_rows = repo
1537 .list(
1538 "posts",
1539 &Query::new().where_filter(Filter::contains("title", "mm")),
1540 )
1541 .expect("contains filter");
1542 assert_eq!(contains_rows.len(), 1);
1543 assert_eq!(contains_rows[0].data.get("title"), Some(&json!("Gamma")));
1544
1545 let contains_non_string_rows = repo
1546 .list(
1547 "posts",
1548 &Query::new().where_filter(Filter::contains("tag", "2")),
1549 )
1550 .expect("contains on mixed type");
1551 assert!(contains_non_string_rows.is_empty());
1552
1553 for (op, expected_titles) in [
1554 (FilterOperator::Gt, vec!["Beta"]),
1555 (FilterOperator::Gte, vec!["Beta", "Gamma"]),
1556 (FilterOperator::Lt, vec!["Alpha"]),
1557 (FilterOperator::Lte, vec!["Alpha", "Gamma"]),
1558 ] {
1559 let rows = repo
1560 .list(
1561 "posts",
1562 &Query::new().where_filter(crate::Filter {
1563 field: "score".to_string(),
1564 op,
1565 value: json!(15),
1566 }),
1567 )
1568 .expect("numeric filter");
1569 let titles: Vec<&str> = rows
1570 .iter()
1571 .map(|row| {
1572 row.data
1573 .get("title")
1574 .and_then(|value| value.as_str())
1575 .expect("title")
1576 })
1577 .collect();
1578 assert_eq!(titles, expected_titles);
1579 }
1580
1581 let unknown_field_sort = repo
1582 .list(
1583 "posts",
1584 &Query::new()
1585 .order_by("missing", SortDirection::Desc)
1586 .paginate(1, 2),
1587 )
1588 .expect("fallback sort");
1589 assert_eq!(unknown_field_sort.len(), 2);
1590 assert_eq!(unknown_field_sort[0].id, 3);
1591 assert_eq!(unknown_field_sort[1].id, 2);
1592
1593 let score_sort = repo
1594 .list(
1595 "posts",
1596 &Query::new()
1597 .order_by("score", SortDirection::Desc)
1598 .order_by("title", SortDirection::Asc),
1599 )
1600 .expect("score sort");
1601 let score_titles: Vec<&str> = score_sort
1602 .iter()
1603 .map(|row| {
1604 row.data
1605 .get("title")
1606 .and_then(|value| value.as_str())
1607 .expect("title")
1608 })
1609 .collect();
1610 assert_eq!(score_titles, vec!["Beta", "Gamma", "Alpha"]);
1611 }
1612
1613 #[test]
1614 fn keyset_pagination_supports_forward_and_backward_windows() {
1615 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1616
1617 for score in 1..=6 {
1618 let mut row = Row::new();
1619 row.insert("title".to_string(), json!(format!("R{score}")));
1620 row.insert("score".to_string(), json!(score));
1621 repo.insert("scores", row).expect("insert score row");
1622 }
1623
1624 let forward_rows = repo
1625 .list(
1626 "scores",
1627 &Query::new()
1628 .order_by("score", SortDirection::Asc)
1629 .keyset_after("score", json!(2), 3),
1630 )
1631 .expect("keyset forward");
1632 let forward_scores: Vec<i64> = forward_rows
1633 .iter()
1634 .map(|row| {
1635 row.data
1636 .get("score")
1637 .and_then(|value| value.as_i64())
1638 .expect("score")
1639 })
1640 .collect();
1641 assert_eq!(forward_scores, vec![3, 4, 5]);
1642
1643 let backward_rows = repo
1644 .list(
1645 "scores",
1646 &Query::new()
1647 .order_by("score", SortDirection::Asc)
1648 .keyset_before("score", json!(5), 2),
1649 )
1650 .expect("keyset backward");
1651 let backward_scores: Vec<i64> = backward_rows
1652 .iter()
1653 .map(|row| {
1654 row.data
1655 .get("score")
1656 .and_then(|value| value.as_i64())
1657 .expect("score")
1658 })
1659 .collect();
1660 assert_eq!(backward_scores, vec![3, 4]);
1661 }
1662
1663 #[test]
1664 fn list_window_emits_tokens_and_compact_payload() {
1665 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1666
1667 for score in 0..300 {
1668 let mut row = Row::new();
1669 row.insert("score".to_string(), json!(score));
1670 row.insert(
1671 "tenant".to_string(),
1672 json!(if score % 2 == 0 { "a" } else { "b" }),
1673 );
1674 repo.insert("accounts", row).expect("insert account");
1675 }
1676
1677 let query = Query::new()
1678 .order_by("score", SortDirection::Asc)
1679 .window(100, 240, 20)
1680 .wire_format(WireFormatProfile::Compact);
1681 let page = repo.list_window("accounts", &query).expect("list window");
1682
1683 assert_eq!(page.total_rows, 300);
1684 assert_eq!(page.offset, 80);
1685 assert_eq!(page.limit, 180);
1686 assert_eq!(page.rows.len(), 180);
1687 assert_eq!(page.query_fingerprint, query.fingerprint());
1688 assert_eq!(page.token.query_fingerprint, query.fingerprint());
1689 let compact = page.compact_rows.expect("compact payload");
1690 assert!(compact.columns.contains(&"id".to_string()));
1691 assert!(compact.columns.contains(&"score".to_string()));
1692 assert_eq!(compact.rows.len(), page.rows.len());
1693 }
1694
1695 #[test]
1696 fn list_window_rejects_mismatched_query_token() {
1697 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1698 let mut row = Row::new();
1699 row.insert("score".to_string(), json!(1));
1700 repo.insert("accounts", row).expect("insert account");
1701
1702 let base = Query::new()
1703 .where_filter(Filter::eq("score", json!(1)))
1704 .paginate(1, 20);
1705 let stale_token = base.next_window_token(0, 20, 42, 1);
1706 let mismatched = Query::new()
1707 .where_filter(Filter::eq("score", json!(2)))
1708 .paginate(1, 20)
1709 .with_window_token(stale_token);
1710
1711 let err = repo
1712 .list_window("accounts", &mismatched)
1713 .expect_err("window token mismatch should fail");
1714 assert!(matches!(err, DataError::Query(_)));
1715 assert!(err.to_string().contains("window token"));
1716 }
1717
1718 #[test]
1719 fn incremental_diff_tracks_insert_update_and_remove() {
1720 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1721
1722 for (title, score) in [("A", 10), ("B", 20), ("C", 30)] {
1723 let mut row = Row::new();
1724 row.insert("title".to_string(), json!(title));
1725 row.insert("score".to_string(), json!(score));
1726 repo.insert("accounts", row).expect("insert");
1727 }
1728
1729 let query = Query::new()
1730 .order_by("score", SortDirection::Asc)
1731 .paginate(1, 10);
1732 let previous = repo.list_window("accounts", &query).expect("previous page");
1733
1734 let mut updated_row = Row::new();
1735 updated_row.insert("title".to_string(), json!("B"));
1736 updated_row.insert("score".to_string(), json!(25));
1737 repo.update("accounts", 2, updated_row).expect("update row");
1738 repo.delete("accounts", 3).expect("delete row");
1739 let mut inserted_row = Row::new();
1740 inserted_row.insert("title".to_string(), json!("D"));
1741 inserted_row.insert("score".to_string(), json!(35));
1742 repo.insert("accounts", inserted_row).expect("insert row");
1743
1744 let current = repo.list_window("accounts", &query).expect("current page");
1745 let diff = repo.materialize_incremental_diff(&previous, ¤t);
1746
1747 assert!(!diff.full_resync);
1748 assert_eq!(diff.inserted.len(), 1);
1749 assert_eq!(diff.inserted[0].id, 4);
1750 assert_eq!(diff.updated.len(), 1);
1751 assert_eq!(diff.updated[0].id, 2);
1752 assert_eq!(diff.removed_ids, vec![3]);
1753 }
1754
1755 #[test]
1756 fn unit_of_work_rolls_back_on_error() {
1757 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1758 let mut row = Row::new();
1759 row.insert("title".to_string(), json!("before"));
1760 let baseline = repo.insert("posts", row).expect("seed row");
1761 let original_count = repo
1762 .list("posts", &Query::new())
1763 .expect("list before")
1764 .len();
1765
1766 let result: Result<(), DataError> = repo.transaction(|inner| {
1767 let mut patch = Row::new();
1768 patch.insert("title".to_string(), json!("mutated"));
1769 inner.update("posts", baseline.id, patch)?;
1770 Err(DataError::Query("force rollback".to_string()))
1771 });
1772 assert!(result.is_err());
1773
1774 let after = repo
1775 .find("posts", baseline.id)
1776 .expect("find after rollback");
1777 assert_eq!(
1778 after
1779 .as_ref()
1780 .and_then(|row| row.data.get("title"))
1781 .and_then(|value| value.as_str()),
1782 Some("before")
1783 );
1784 assert_eq!(
1785 repo.list("posts", &Query::new()).expect("list after").len(),
1786 original_count
1787 );
1788 }
1789
1790 #[test]
1791 fn optimistic_lock_update_enforces_version_match_and_bumps_version() {
1792 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1793 let mut row = Row::new();
1794 row.insert("title".to_string(), json!("v1"));
1795 row.insert("lock_version".to_string(), json!(1u64));
1796 let inserted = repo.insert("posts", row).expect("seed row");
1797
1798 let mut patch = Row::new();
1799 patch.insert("title".to_string(), json!("v2"));
1800 let updated = repo
1801 .update_with_optimistic_lock(
1802 "posts",
1803 inserted.id,
1804 patch,
1805 OptimisticLock::new("lock_version", 1),
1806 )
1807 .expect("optimistic update should pass");
1808 assert_eq!(updated.data.get("title"), Some(&json!("v2")));
1809 assert_eq!(updated.data.get("lock_version"), Some(&json!(2u64)));
1810
1811 let err = repo
1812 .update_with_optimistic_lock(
1813 "posts",
1814 inserted.id,
1815 Row::new(),
1816 OptimisticLock::new("lock_version", 1),
1817 )
1818 .expect_err("stale version should fail");
1819 assert!(err.to_string().contains("optimistic lock conflict"));
1820 }
1821
1822 #[test]
1823 fn query_observability_tracker_surfaces_slow_and_n_plus_one_hints() {
1824 let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1825 for index in 0..5 {
1826 let mut row = Row::new();
1827 row.insert("title".to_string(), json!(format!("R{index}")));
1828 repo.insert("posts", row).expect("insert");
1829 }
1830
1831 let query = Query::new().paginate(1, 2);
1832 let context = QueryContext::default()
1833 .with_correlation_id("corr-57")
1834 .with_request_id("req-57");
1835 let mut tracker = QueryObservabilityTracker::new(QueryObservabilityPolicy {
1836 slow_query_threshold_ms: 0,
1837 n_plus_one_window_size: 6,
1838 n_plus_one_repeat_threshold: 2,
1839 });
1840
1841 let (_, first) = repo
1842 .list_observed("posts", &query, &context, &mut tracker)
1843 .expect("first observed list");
1844 assert!(first.slow_query);
1845 assert!(!first.potential_n_plus_one);
1846 assert_eq!(first.correlation_id.as_deref(), Some("corr-57"));
1847 assert_eq!(first.request_id.as_deref(), Some("req-57"));
1848
1849 let (_, second) = repo
1850 .list_observed("posts", &query, &context, &mut tracker)
1851 .expect("second observed list");
1852 assert!(second.potential_n_plus_one);
1853 assert!(second
1854 .hints
1855 .iter()
1856 .any(|hint| hint.contains("possible N+1")));
1857
1858 let query_with_preload = query.clone().preload("author");
1859 let (_, third) = repo
1860 .list_observed("posts", &query_with_preload, &context, &mut tracker)
1861 .expect("observed list with preload");
1862 assert!(!third.potential_n_plus_one);
1863 }
1864}