Skip to main content

shelly_data/
repo.rs

1use crate::{
2    adapter::{AdapterKind, DatabaseConfig},
3    error::{DataError, DataResult},
4    query::{
5        Filter, FilterOperator, KeysetDirection, Query, SortDirection, WindowToken,
6        WireFormatProfile,
7    },
8    QueryContext,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::{
13    cmp::Ordering,
14    collections::{BTreeMap, HashMap},
15    sync::Arc,
16    time::{Instant, SystemTime, UNIX_EPOCH},
17};
18use tracing::{info, warn};
19
20pub type Row = BTreeMap<String, Value>;
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub struct StoredRow {
24    pub id: u64,
25    pub data: Row,
26}
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct CompactRowsPayload {
30    pub columns: Vec<String>,
31    pub rows: Vec<Vec<Value>>,
32}
33
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
35pub struct WindowPage {
36    pub offset: usize,
37    pub limit: usize,
38    pub total_rows: usize,
39    pub rows: Vec<StoredRow>,
40    pub query_fingerprint: String,
41    pub token: WindowToken,
42    pub wire_format: WireFormatProfile,
43    pub compact_rows: Option<CompactRowsPayload>,
44}
45
46#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47pub struct IncrementalDiff {
48    pub from_token: Option<WindowToken>,
49    pub to_token: WindowToken,
50    pub full_resync: bool,
51    pub inserted: Vec<StoredRow>,
52    pub updated: Vec<StoredRow>,
53    pub removed_ids: Vec<u64>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct OptimisticLock {
58    pub field: String,
59    pub expected_version: u64,
60    pub increment_by: u64,
61}
62
63impl OptimisticLock {
64    pub fn new(field: impl Into<String>, expected_version: u64) -> Self {
65        Self {
66            field: field.into(),
67            expected_version,
68            increment_by: 1,
69        }
70    }
71
72    pub fn increment_by(mut self, increment_by: u64) -> Self {
73        self.increment_by = increment_by.max(1);
74        self
75    }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79pub struct QueryObservabilityPolicy {
80    pub slow_query_threshold_ms: u64,
81    pub n_plus_one_window_size: usize,
82    pub n_plus_one_repeat_threshold: usize,
83}
84
85impl Default for QueryObservabilityPolicy {
86    fn default() -> Self {
87        Self {
88            slow_query_threshold_ms: 250,
89            n_plus_one_window_size: 12,
90            n_plus_one_repeat_threshold: 3,
91        }
92    }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct QueryObservation {
97    pub table: String,
98    pub operation: String,
99    pub query_fingerprint: String,
100    pub duration_ms: u64,
101    pub slow_query: bool,
102    pub potential_n_plus_one: bool,
103    pub repeated_fingerprint_count: usize,
104    pub preloads: Vec<String>,
105    pub trace_id: Option<String>,
106    pub correlation_id: Option<String>,
107    pub request_id: Option<String>,
108    pub hints: Vec<String>,
109}
110
111#[derive(Debug, Default, Clone)]
112pub struct QueryObservabilityTracker {
113    policy: QueryObservabilityPolicy,
114    recent_fingerprints: HashMap<String, Vec<String>>,
115}
116
117impl QueryObservabilityTracker {
118    pub fn new(policy: QueryObservabilityPolicy) -> Self {
119        Self {
120            policy,
121            recent_fingerprints: HashMap::new(),
122        }
123    }
124
125    pub fn policy(&self) -> &QueryObservabilityPolicy {
126        &self.policy
127    }
128
129    pub fn observe(
130        &mut self,
131        table: &str,
132        operation: &str,
133        query: &Query,
134        context: &QueryContext,
135        duration_ms: u64,
136    ) -> QueryObservation {
137        let fingerprint = query.fingerprint();
138        let scope_key = observation_scope_key(table, operation, context);
139        let history = self.recent_fingerprints.entry(scope_key).or_default();
140        history.push(fingerprint.clone());
141        if history.len() > self.policy.n_plus_one_window_size {
142            let drain_count = history.len() - self.policy.n_plus_one_window_size;
143            history.drain(0..drain_count);
144        }
145
146        let repeated_fingerprint_count = history
147            .iter()
148            .filter(|value| *value == &fingerprint)
149            .count();
150        let potential_n_plus_one = query.preloads.is_empty()
151            && repeated_fingerprint_count >= self.policy.n_plus_one_repeat_threshold;
152        let slow_query = duration_ms >= self.policy.slow_query_threshold_ms;
153
154        let mut hints = Vec::new();
155        if slow_query {
156            hints.push(format!(
157                "slow query ({}ms >= {}ms); consider indexes and narrower filters",
158                duration_ms, self.policy.slow_query_threshold_ms
159            ));
160        }
161        if potential_n_plus_one {
162            hints.push(format!(
163                "repeated query fingerprint observed {} times without preloads; possible N+1",
164                repeated_fingerprint_count
165            ));
166        }
167
168        QueryObservation {
169            table: table.to_string(),
170            operation: operation.to_string(),
171            query_fingerprint: fingerprint,
172            duration_ms,
173            slow_query,
174            potential_n_plus_one,
175            repeated_fingerprint_count,
176            preloads: query.preloads.clone(),
177            trace_id: context.trace_id.clone(),
178            correlation_id: context.correlation_id().map(ToString::to_string),
179            request_id: context.request_id().map(ToString::to_string),
180            hints,
181        }
182    }
183}
184
185pub trait AdapterDriver: Send + Sync {
186    fn kind(&self) -> AdapterKind;
187}
188
189#[derive(Debug, Default, Clone, Copy)]
190pub struct PostgresAdapter;
191
192impl AdapterDriver for PostgresAdapter {
193    fn kind(&self) -> AdapterKind {
194        AdapterKind::Postgres
195    }
196}
197
198#[derive(Debug, Default, Clone, Copy)]
199pub struct MySqlAdapter;
200
201impl AdapterDriver for MySqlAdapter {
202    fn kind(&self) -> AdapterKind {
203        AdapterKind::MySql
204    }
205}
206
207#[derive(Debug, Default, Clone, Copy)]
208pub struct SqliteAdapter;
209
210impl AdapterDriver for SqliteAdapter {
211    fn kind(&self) -> AdapterKind {
212        AdapterKind::Sqlite
213    }
214}
215
216#[derive(Debug, Default, Clone, Copy)]
217pub struct SingleStoreDriver;
218
219impl AdapterDriver for SingleStoreDriver {
220    fn kind(&self) -> AdapterKind {
221        AdapterKind::SingleStore
222    }
223}
224
225#[derive(Debug, Default, Clone, Copy)]
226pub struct ClickHouseDriver;
227
228impl AdapterDriver for ClickHouseDriver {
229    fn kind(&self) -> AdapterKind {
230        AdapterKind::ClickHouse
231    }
232}
233
234#[derive(Debug, Default, Clone, Copy)]
235pub struct BigQueryDriver;
236
237impl AdapterDriver for BigQueryDriver {
238    fn kind(&self) -> AdapterKind {
239        AdapterKind::BigQuery
240    }
241}
242
243#[derive(Debug, Default, Clone, Copy)]
244pub struct OpenSearchAdapterDriver;
245
246impl AdapterDriver for OpenSearchAdapterDriver {
247    fn kind(&self) -> AdapterKind {
248        AdapterKind::OpenSearch
249    }
250}
251
252pub fn adapter_for(config: &DatabaseConfig) -> DataResult<Box<dyn AdapterDriver>> {
253    match config.adapter {
254        AdapterKind::Postgres => Ok(Box::new(PostgresAdapter)),
255        AdapterKind::MySql => Ok(Box::new(MySqlAdapter)),
256        AdapterKind::Sqlite => Ok(Box::new(SqliteAdapter)),
257        AdapterKind::SingleStore => Ok(Box::new(SingleStoreDriver)),
258        AdapterKind::ClickHouse => Ok(Box::new(ClickHouseDriver)),
259        AdapterKind::BigQuery => Ok(Box::new(BigQueryDriver)),
260        AdapterKind::OpenSearch => Ok(Box::new(OpenSearchAdapterDriver)),
261        AdapterKind::None => Err(DataError::Adapter(
262            "database adapter is `none`; select a supported backend in shelly.data.toml"
263                .to_string(),
264        )),
265    }
266}
267
268pub trait Repo {
269    fn adapter_kind(&self) -> AdapterKind;
270    fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow>;
271    fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow>;
272    fn delete(&mut self, table: &str, id: u64) -> DataResult<()>;
273    fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>>;
274    fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>>;
275    fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage>;
276    fn materialize_incremental_diff(
277        &self,
278        previous: &WindowPage,
279        current: &WindowPage,
280    ) -> IncrementalDiff;
281}
282
283pub trait RepoUnitOfWork: Repo {
284    fn transaction<T, F>(&mut self, operation: F) -> DataResult<T>
285    where
286        F: FnOnce(&mut Self) -> DataResult<T>;
287}
288
289pub trait OptimisticLockingRepo: Repo {
290    fn update_with_optimistic_lock(
291        &mut self,
292        table: &str,
293        id: u64,
294        patch: Row,
295        lock: OptimisticLock,
296    ) -> DataResult<StoredRow>;
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
300#[serde(rename_all = "snake_case")]
301pub enum TenantRepoOperation {
302    Insert,
303    Update,
304    Delete,
305    Find,
306    List,
307    ListWindow,
308}
309
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub struct TenantPolicyContext {
312    pub table: String,
313    pub operation: TenantRepoOperation,
314    pub tenant_id: Option<String>,
315    pub row_tenant_id: Option<String>,
316    pub row_id: Option<u64>,
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct TenantPolicyDecision {
321    pub allowed: bool,
322    pub code: Option<String>,
323    pub message: Option<String>,
324}
325
326impl TenantPolicyDecision {
327    pub fn allow() -> Self {
328        Self {
329            allowed: true,
330            code: None,
331            message: None,
332        }
333    }
334
335    pub fn deny(code: impl Into<String>, message: impl Into<String>) -> Self {
336        Self {
337            allowed: false,
338            code: Some(code.into()),
339            message: Some(message.into()),
340        }
341    }
342}
343
344pub type TenantPolicyHook = Arc<dyn Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync>;
345
346#[derive(Clone)]
347pub struct TenantRepoConfig {
348    pub tenant_field: String,
349    pub require_tenant_context: bool,
350    pub policy_hook: Option<TenantPolicyHook>,
351}
352
353impl Default for TenantRepoConfig {
354    fn default() -> Self {
355        Self {
356            tenant_field: "tenant_id".to_string(),
357            require_tenant_context: true,
358            policy_hook: None,
359        }
360    }
361}
362
363pub struct TenantScopedRepo<R: Repo> {
364    inner: R,
365    config: TenantRepoConfig,
366}
367
368impl<R: Repo> TenantScopedRepo<R> {
369    pub fn new(inner: R) -> Self {
370        Self {
371            inner,
372            config: TenantRepoConfig::default(),
373        }
374    }
375
376    pub fn with_config(mut self, config: TenantRepoConfig) -> Self {
377        self.config = config;
378        self
379    }
380
381    pub fn with_policy_hook<F>(mut self, policy_hook: F) -> Self
382    where
383        F: Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync + 'static,
384    {
385        self.config.policy_hook = Some(Arc::new(policy_hook));
386        self
387    }
388
389    pub fn inner(&self) -> &R {
390        &self.inner
391    }
392
393    pub fn inner_mut(&mut self) -> &mut R {
394        &mut self.inner
395    }
396
397    pub fn into_inner(self) -> R {
398        self.inner
399    }
400
401    pub fn insert_scoped(
402        &mut self,
403        table: &str,
404        context: &QueryContext,
405        mut data: Row,
406    ) -> DataResult<StoredRow> {
407        let tenant_id = self.required_tenant_id(context)?;
408        let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
409        self.ensure_tenant_match(
410            table,
411            TenantRepoOperation::Insert,
412            tenant_id.as_deref(),
413            row_tenant_id.as_deref(),
414            None,
415        )?;
416        if let Some(tenant_id) = tenant_id.as_deref() {
417            data.insert(
418                self.config.tenant_field.clone(),
419                Value::String(tenant_id.to_string()),
420            );
421        }
422        self.inner.insert(table, data)
423    }
424
425    pub fn update_scoped(
426        &mut self,
427        table: &str,
428        id: u64,
429        context: &QueryContext,
430        mut data: Row,
431    ) -> DataResult<StoredRow> {
432        let tenant_id = self.required_tenant_id(context)?;
433        let existing = self.inner.find(table, id)?;
434        if let Some(existing) = existing.as_ref() {
435            self.ensure_tenant_match(
436                table,
437                TenantRepoOperation::Update,
438                tenant_id.as_deref(),
439                tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
440                Some(id),
441            )?;
442        }
443        let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
444        self.ensure_tenant_match(
445            table,
446            TenantRepoOperation::Update,
447            tenant_id.as_deref(),
448            row_tenant_id.as_deref(),
449            Some(id),
450        )?;
451        if let Some(tenant_id) = tenant_id.as_deref() {
452            data.insert(
453                self.config.tenant_field.clone(),
454                Value::String(tenant_id.to_string()),
455            );
456        }
457        self.inner.update(table, id, data)
458    }
459
460    pub fn delete_scoped(
461        &mut self,
462        table: &str,
463        id: u64,
464        context: &QueryContext,
465    ) -> DataResult<()> {
466        let tenant_id = self.required_tenant_id(context)?;
467        let existing = self.inner.find(table, id)?;
468        if let Some(existing) = existing.as_ref() {
469            self.ensure_tenant_match(
470                table,
471                TenantRepoOperation::Delete,
472                tenant_id.as_deref(),
473                tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
474                Some(id),
475            )?;
476        }
477        self.inner.delete(table, id)
478    }
479
480    pub fn find_scoped(
481        &self,
482        table: &str,
483        id: u64,
484        context: &QueryContext,
485    ) -> DataResult<Option<StoredRow>> {
486        let tenant_id = self.required_tenant_id(context)?;
487        let row = self.inner.find(table, id)?;
488        if let Some(row) = row.as_ref() {
489            self.ensure_tenant_match(
490                table,
491                TenantRepoOperation::Find,
492                tenant_id.as_deref(),
493                tenant_id_from_row(&row.data, self.config.tenant_field.as_str()).as_deref(),
494                Some(id),
495            )?;
496        }
497        Ok(row)
498    }
499
500    pub fn list_scoped(
501        &self,
502        table: &str,
503        query: &Query,
504        context: &QueryContext,
505    ) -> DataResult<Vec<StoredRow>> {
506        let tenant_id = self.required_tenant_id(context)?;
507        self.evaluate_policy(TenantPolicyContext {
508            table: table.to_string(),
509            operation: TenantRepoOperation::List,
510            tenant_id: tenant_id.clone(),
511            row_tenant_id: None,
512            row_id: None,
513        })?;
514        let scoped_query = self.scoped_query(query, tenant_id.as_deref());
515        self.inner.list(table, &scoped_query)
516    }
517
518    pub fn list_window_scoped(
519        &self,
520        table: &str,
521        query: &Query,
522        context: &QueryContext,
523    ) -> DataResult<WindowPage> {
524        let tenant_id = self.required_tenant_id(context)?;
525        self.evaluate_policy(TenantPolicyContext {
526            table: table.to_string(),
527            operation: TenantRepoOperation::ListWindow,
528            tenant_id: tenant_id.clone(),
529            row_tenant_id: None,
530            row_id: None,
531        })?;
532        let scoped_query = self.scoped_query(query, tenant_id.as_deref());
533        self.inner.list_window(table, &scoped_query)
534    }
535
536    fn scoped_query(&self, query: &Query, tenant_id: Option<&str>) -> Query {
537        let mut scoped = query.clone();
538        if let Some(tenant_id) = tenant_id {
539            scoped.filters.push(Filter::eq(
540                self.config.tenant_field.clone(),
541                Value::String(tenant_id.to_string()),
542            ));
543        }
544        scoped
545    }
546
547    fn required_tenant_id(&self, context: &QueryContext) -> DataResult<Option<String>> {
548        let tenant_id = normalize_tenant_id(context.tenant_id.as_deref());
549        if self.config.require_tenant_context && tenant_id.is_none() {
550            return Err(tenant_error(
551                "tenant_context_required",
552                "tenant context is required for scoped data operation",
553            ));
554        }
555        Ok(tenant_id)
556    }
557
558    fn ensure_tenant_match(
559        &self,
560        table: &str,
561        operation: TenantRepoOperation,
562        tenant_id: Option<&str>,
563        row_tenant_id: Option<&str>,
564        row_id: Option<u64>,
565    ) -> DataResult<()> {
566        let normalized_tenant_id = normalize_tenant_id(tenant_id);
567        let normalized_row_tenant_id = normalize_tenant_id(row_tenant_id);
568        if let Some(expected_tenant) = normalized_tenant_id.as_deref() {
569            if let Some(row_tenant) = normalized_row_tenant_id.as_deref() {
570                if row_tenant != expected_tenant {
571                    return Err(tenant_error(
572                        "tenant_row_mismatch",
573                        format!(
574                            "row tenant does not match tenant context for table `{table}` (row_id={row_id:?})"
575                        ),
576                    ));
577                }
578            }
579        }
580        self.evaluate_policy(TenantPolicyContext {
581            table: table.to_string(),
582            operation,
583            tenant_id: normalized_tenant_id,
584            row_tenant_id: normalized_row_tenant_id,
585            row_id,
586        })
587    }
588
589    fn evaluate_policy(&self, context: TenantPolicyContext) -> DataResult<()> {
590        let Some(policy_hook) = self.config.policy_hook.as_ref() else {
591            return Ok(());
592        };
593        let decision = policy_hook(&context);
594        if decision.allowed {
595            Ok(())
596        } else {
597            Err(tenant_error(
598                decision.code.as_deref().unwrap_or("tenant_policy_denied"),
599                decision
600                    .message
601                    .as_deref()
602                    .unwrap_or("tenant policy denied data operation"),
603            ))
604        }
605    }
606}
607
608fn tenant_error(code: &str, message: impl Into<String>) -> DataError {
609    DataError::Query(format!("[{code}] {}", message.into()))
610}
611
612fn observation_scope_key(table: &str, operation: &str, context: &QueryContext) -> String {
613    let correlation_scope = context
614        .correlation_id()
615        .or_else(|| context.request_id())
616        .or(context.trace_id.as_deref())
617        .unwrap_or("global");
618    format!("{table}:{operation}:{correlation_scope}")
619}
620
621fn normalize_tenant_id(tenant_id: Option<&str>) -> Option<String> {
622    tenant_id
623        .map(str::trim)
624        .filter(|value| !value.is_empty())
625        .map(ToString::to_string)
626}
627
628fn tenant_id_from_row(row: &Row, tenant_field: &str) -> Option<String> {
629    row.get(tenant_field)
630        .and_then(Value::as_str)
631        .and_then(|value| normalize_tenant_id(Some(value)))
632}
633
634pub struct MemoryRepo {
635    driver: Box<dyn AdapterDriver>,
636    tables: BTreeMap<String, Vec<StoredRow>>,
637    next_id: u64,
638}
639
640impl MemoryRepo {
641    pub fn new(driver: Box<dyn AdapterDriver>) -> Self {
642        Self {
643            driver,
644            tables: BTreeMap::new(),
645            next_id: 1,
646        }
647    }
648
649    pub fn list_observed(
650        &self,
651        table: &str,
652        query: &Query,
653        context: &QueryContext,
654        tracker: &mut QueryObservabilityTracker,
655    ) -> DataResult<(Vec<StoredRow>, QueryObservation)> {
656        let started_at = Instant::now();
657        let rows = self.list(table, query)?;
658        let observation = tracker.observe(
659            table,
660            "list",
661            query,
662            context,
663            started_at.elapsed().as_millis() as u64,
664        );
665        Ok((rows, observation))
666    }
667
668    pub fn list_window_observed(
669        &self,
670        table: &str,
671        query: &Query,
672        context: &QueryContext,
673        tracker: &mut QueryObservabilityTracker,
674    ) -> DataResult<(WindowPage, QueryObservation)> {
675        let started_at = Instant::now();
676        let page = self.list_window(table, query)?;
677        let observation = tracker.observe(
678            table,
679            "list_window",
680            query,
681            context,
682            started_at.elapsed().as_millis() as u64,
683        );
684        Ok((page, observation))
685    }
686}
687
688impl Repo for MemoryRepo {
689    fn adapter_kind(&self) -> AdapterKind {
690        self.driver.kind()
691    }
692
693    fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow> {
694        let started_at = Instant::now();
695        let result = {
696            let entry = self.tables.entry(table.to_string()).or_default();
697            let row = StoredRow {
698                id: self.next_id,
699                data,
700            };
701            self.next_id += 1;
702            entry.push(row.clone());
703            Ok(row)
704        };
705
706        match &result {
707            Ok(row) => info!(
708                target: "shelly.data.query",
709                source = "memory_repo",
710                adapter = self.driver.kind().as_str(),
711                operation = "insert",
712                table,
713                row_id = row.id,
714                duration_ms = started_at.elapsed().as_millis() as u64,
715                "Shelly data query executed"
716            ),
717            Err(err) => warn!(
718                target: "shelly.data.query",
719                source = "memory_repo",
720                adapter = self.driver.kind().as_str(),
721                operation = "insert",
722                table,
723                duration_ms = started_at.elapsed().as_millis() as u64,
724                error = %err,
725                "Shelly data query failed"
726            ),
727        }
728
729        result
730    }
731
732    fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow> {
733        let started_at = Instant::now();
734        let result = {
735            let rows = self.tables.entry(table.to_string()).or_default();
736            match rows.iter_mut().find(|row| row.id == id) {
737                Some(existing) => {
738                    existing.data = data;
739                    Ok(existing.clone())
740                }
741                None => Err(DataError::Query(format!(
742                    "row id {id} not found in table `{table}`"
743                ))),
744            }
745        };
746
747        match &result {
748            Ok(row) => info!(
749                target: "shelly.data.query",
750                source = "memory_repo",
751                adapter = self.driver.kind().as_str(),
752                operation = "update",
753                table,
754                row_id = row.id,
755                duration_ms = started_at.elapsed().as_millis() as u64,
756                "Shelly data query executed"
757            ),
758            Err(err) => warn!(
759                target: "shelly.data.query",
760                source = "memory_repo",
761                adapter = self.driver.kind().as_str(),
762                operation = "update",
763                table,
764                row_id = id,
765                duration_ms = started_at.elapsed().as_millis() as u64,
766                error = %err,
767                "Shelly data query failed"
768            ),
769        }
770
771        result
772    }
773
774    fn delete(&mut self, table: &str, id: u64) -> DataResult<()> {
775        let started_at = Instant::now();
776        let result = {
777            let rows = self.tables.entry(table.to_string()).or_default();
778            let initial_len = rows.len();
779            rows.retain(|row| row.id != id);
780            if rows.len() == initial_len {
781                Err(DataError::Query(format!(
782                    "row id {id} not found in table `{table}`"
783                )))
784            } else {
785                Ok(())
786            }
787        };
788
789        match &result {
790            Ok(()) => info!(
791                target: "shelly.data.query",
792                source = "memory_repo",
793                adapter = self.driver.kind().as_str(),
794                operation = "delete",
795                table,
796                row_id = id,
797                duration_ms = started_at.elapsed().as_millis() as u64,
798                "Shelly data query executed"
799            ),
800            Err(err) => warn!(
801                target: "shelly.data.query",
802                source = "memory_repo",
803                adapter = self.driver.kind().as_str(),
804                operation = "delete",
805                table,
806                row_id = id,
807                duration_ms = started_at.elapsed().as_millis() as u64,
808                error = %err,
809                "Shelly data query failed"
810            ),
811        }
812
813        result
814    }
815
816    fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>> {
817        let started_at = Instant::now();
818        let result = Ok(self
819            .tables
820            .get(table)
821            .and_then(|rows| rows.iter().find(|row| row.id == id))
822            .cloned());
823
824        match &result {
825            Ok(row) => info!(
826                target: "shelly.data.query",
827                source = "memory_repo",
828                adapter = self.driver.kind().as_str(),
829                operation = "find",
830                table,
831                row_id = id,
832                found = row.is_some(),
833                duration_ms = started_at.elapsed().as_millis() as u64,
834                "Shelly data query executed"
835            ),
836            Err(err) => warn!(
837                target: "shelly.data.query",
838                source = "memory_repo",
839                adapter = self.driver.kind().as_str(),
840                operation = "find",
841                table,
842                row_id = id,
843                duration_ms = started_at.elapsed().as_millis() as u64,
844                error = %err,
845                "Shelly data query failed"
846            ),
847        }
848
849        result
850    }
851
852    fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>> {
853        let started_at = Instant::now();
854        let result = {
855            let rows = self.tables.get(table).cloned().unwrap_or_default();
856            let rows = materialize_rows(rows, query);
857            Ok(rows)
858        };
859
860        match &result {
861            Ok(rows) => info!(
862                target: "shelly.data.query",
863                source = "memory_repo",
864                adapter = self.driver.kind().as_str(),
865                operation = "list",
866                table,
867                row_count = rows.len(),
868                filter_count = query.filters.len(),
869                sort_count = query.sorts.len(),
870                page = query.pagination.map(|value| value.page),
871                per_page = query.pagination.map(|value| value.per_page),
872                keyset_limit = query.keyset.as_ref().map(|value| value.limit),
873                wire_format = ?query.wire_format,
874                duration_ms = started_at.elapsed().as_millis() as u64,
875                "Shelly data query executed"
876            ),
877            Err(err) => warn!(
878                target: "shelly.data.query",
879                source = "memory_repo",
880                adapter = self.driver.kind().as_str(),
881                operation = "list",
882                table,
883                filter_count = query.filters.len(),
884                sort_count = query.sorts.len(),
885                page = query.pagination.map(|value| value.page),
886                per_page = query.pagination.map(|value| value.per_page),
887                keyset_limit = query.keyset.as_ref().map(|value| value.limit),
888                wire_format = ?query.wire_format,
889                duration_ms = started_at.elapsed().as_millis() as u64,
890                error = %err,
891                "Shelly data query failed"
892            ),
893        }
894
895        result
896    }
897
898    fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage> {
899        if !query.has_valid_window_token() {
900            return Err(DataError::Query(
901                "window token query fingerprint mismatch".to_string(),
902            ));
903        }
904
905        let started_at = Instant::now();
906        let all_rows = materialize_rows(self.tables.get(table).cloned().unwrap_or_default(), query);
907        let total_rows = all_rows.len();
908        let (offset, limit) = window_range_from_query(query, total_rows);
909        let rows = all_rows
910            .into_iter()
911            .skip(offset)
912            .take(limit.max(1))
913            .collect::<Vec<_>>();
914        let token = query.next_window_token(offset, limit.max(1), now_epoch_ms(), {
915            (offset as u64)
916                ^ (limit as u64)
917                ^ (rows.len() as u64)
918                ^ (total_rows as u64)
919                ^ self.next_id
920        });
921        let compact_rows =
922            (query.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
923        let page = WindowPage {
924            offset,
925            limit: limit.max(1),
926            total_rows,
927            rows,
928            query_fingerprint: query.fingerprint(),
929            token,
930            wire_format: query.wire_format,
931            compact_rows,
932        };
933
934        info!(
935            target: "shelly.data.query",
936            source = "memory_repo",
937            adapter = self.driver.kind().as_str(),
938            operation = "list_window",
939            table,
940            row_count = page.rows.len(),
941            total_rows = page.total_rows,
942            offset = page.offset,
943            limit = page.limit,
944            wire_format = ?page.wire_format,
945            duration_ms = started_at.elapsed().as_millis() as u64,
946            "Shelly data query executed"
947        );
948
949        Ok(page)
950    }
951
952    fn materialize_incremental_diff(
953        &self,
954        previous: &WindowPage,
955        current: &WindowPage,
956    ) -> IncrementalDiff {
957        let full_resync = previous.query_fingerprint != current.query_fingerprint
958            || previous.wire_format != current.wire_format;
959
960        if full_resync {
961            return IncrementalDiff {
962                from_token: Some(previous.token.clone()),
963                to_token: current.token.clone(),
964                full_resync: true,
965                inserted: current.rows.clone(),
966                updated: Vec::new(),
967                removed_ids: Vec::new(),
968            };
969        }
970
971        let previous_rows: HashMap<u64, &StoredRow> =
972            previous.rows.iter().map(|row| (row.id, row)).collect();
973        let current_rows: HashMap<u64, &StoredRow> =
974            current.rows.iter().map(|row| (row.id, row)).collect();
975
976        let mut inserted = Vec::new();
977        let mut updated = Vec::new();
978        for row in &current.rows {
979            match previous_rows.get(&row.id) {
980                None => inserted.push(row.clone()),
981                Some(previous_row) if previous_row.data != row.data => updated.push(row.clone()),
982                _ => {}
983            }
984        }
985
986        let mut removed_ids = previous_rows
987            .keys()
988            .filter(|id| !current_rows.contains_key(id))
989            .copied()
990            .collect::<Vec<_>>();
991        removed_ids.sort_unstable();
992
993        IncrementalDiff {
994            from_token: Some(previous.token.clone()),
995            to_token: current.token.clone(),
996            full_resync: false,
997            inserted,
998            updated,
999            removed_ids,
1000        }
1001    }
1002}
1003
1004impl RepoUnitOfWork for MemoryRepo {
1005    fn transaction<T, F>(&mut self, operation: F) -> DataResult<T>
1006    where
1007        F: FnOnce(&mut Self) -> DataResult<T>,
1008    {
1009        let snapshot_tables = self.tables.clone();
1010        let snapshot_next_id = self.next_id;
1011        match operation(self) {
1012            Ok(value) => Ok(value),
1013            Err(err) => {
1014                self.tables = snapshot_tables;
1015                self.next_id = snapshot_next_id;
1016                Err(err)
1017            }
1018        }
1019    }
1020}
1021
1022impl OptimisticLockingRepo for MemoryRepo {
1023    fn update_with_optimistic_lock(
1024        &mut self,
1025        table: &str,
1026        id: u64,
1027        patch: Row,
1028        lock: OptimisticLock,
1029    ) -> DataResult<StoredRow> {
1030        let existing = self.find(table, id)?.ok_or_else(|| {
1031            DataError::Query(format!(
1032                "row id {id} not found in table `{table}` for optimistic lock update"
1033            ))
1034        })?;
1035        let current_version = existing
1036            .data
1037            .get(lock.field.as_str())
1038            .and_then(Value::as_u64)
1039            .ok_or_else(|| {
1040                DataError::Query(format!(
1041                    "row id {id} in `{table}` missing optimistic lock field `{}`",
1042                    lock.field
1043                ))
1044            })?;
1045        if current_version != lock.expected_version {
1046            return Err(DataError::Query(format!(
1047                "optimistic lock conflict on `{table}` row id {id}: expected {} but found {}",
1048                lock.expected_version, current_version
1049            )));
1050        }
1051        let next_version = current_version
1052            .checked_add(lock.increment_by)
1053            .ok_or_else(|| {
1054                DataError::Query(format!(
1055                    "optimistic lock version overflow on `{table}` row id {id}"
1056                ))
1057            })?;
1058
1059        let mut merged = existing.data;
1060        for (key, value) in patch {
1061            merged.insert(key, value);
1062        }
1063        merged.insert(lock.field, Value::from(next_version));
1064        self.update(table, id, merged)
1065    }
1066}
1067
1068fn materialize_rows(mut rows: Vec<StoredRow>, query: &Query) -> Vec<StoredRow> {
1069    if !query.filters.is_empty() {
1070        rows.retain(|row| {
1071            query
1072                .filters
1073                .iter()
1074                .all(|filter| matches_filter(row, filter))
1075        });
1076    }
1077
1078    for sort in query.sorts.iter().rev() {
1079        rows.sort_by(|left, right| compare_for_sort(left, right, sort.field.as_str()));
1080        if sort.direction == SortDirection::Desc {
1081            rows.reverse();
1082        }
1083    }
1084
1085    if let Some(keyset) = &query.keyset {
1086        rows = apply_keyset(rows, keyset);
1087    } else if let Some(pagination) = query.pagination {
1088        let offset = (pagination.page.saturating_sub(1)) * pagination.per_page;
1089        rows = rows
1090            .into_iter()
1091            .skip(offset)
1092            .take(pagination.per_page)
1093            .collect();
1094    }
1095
1096    rows
1097}
1098
1099fn apply_keyset(rows: Vec<StoredRow>, keyset: &crate::query::KeysetPagination) -> Vec<StoredRow> {
1100    let limit = keyset.limit.max(1);
1101    let Some(cursor) = keyset.cursor.as_ref() else {
1102        return rows.into_iter().take(limit).collect();
1103    };
1104
1105    match cursor.direction {
1106        KeysetDirection::Forward => rows
1107            .into_iter()
1108            .filter(|row| compare_row_cursor(row, cursor) == Ordering::Greater)
1109            .take(limit)
1110            .collect(),
1111        KeysetDirection::Backward => {
1112            let mut filtered = rows
1113                .into_iter()
1114                .filter(|row| compare_row_cursor(row, cursor) == Ordering::Less)
1115                .collect::<Vec<_>>();
1116            let keep_from = filtered.len().saturating_sub(limit);
1117            filtered.drain(0..keep_from);
1118            filtered
1119        }
1120    }
1121}
1122
1123fn compare_row_cursor(row: &StoredRow, cursor: &crate::query::KeysetCursor) -> Ordering {
1124    row.data
1125        .get(&cursor.field)
1126        .map(|value| compare_json_values(value, &cursor.value))
1127        .unwrap_or_else(|| row.id.cmp(&cursor.value.as_u64().unwrap_or_default()))
1128}
1129
1130fn compare_json_values(left: &Value, right: &Value) -> Ordering {
1131    match (left, right) {
1132        (Value::Number(_), Value::Number(_)) => {
1133            compare_numbers(left, right).unwrap_or(Ordering::Equal)
1134        }
1135        (Value::String(left), Value::String(right)) => left.cmp(right),
1136        (Value::Bool(left), Value::Bool(right)) => left.cmp(right),
1137        _ => left.to_string().cmp(&right.to_string()),
1138    }
1139}
1140
1141fn window_range_from_query(query: &Query, total_rows: usize) -> (usize, usize) {
1142    if let Some(window) = query.window {
1143        let span = window.span().max(1);
1144        let overscan = window.overscan;
1145        let start = window.start.saturating_sub(overscan);
1146        let limit = span.saturating_add(overscan.saturating_mul(2)).max(1);
1147        if total_rows == 0 {
1148            (0, limit)
1149        } else {
1150            (start.min(total_rows.saturating_sub(1)), limit)
1151        }
1152    } else if let Some(pagination) = query.pagination {
1153        (
1154            (pagination.page.saturating_sub(1)) * pagination.per_page,
1155            pagination.per_page.max(1),
1156        )
1157    } else if let Some(keyset) = &query.keyset {
1158        (
1159            query
1160                .window_token
1161                .as_ref()
1162                .map(|token| token.offset)
1163                .unwrap_or(0),
1164            keyset.limit.max(1),
1165        )
1166    } else {
1167        (
1168            query
1169                .window_token
1170                .as_ref()
1171                .map(|token| token.offset)
1172                .unwrap_or(0),
1173            query
1174                .window_token
1175                .as_ref()
1176                .map(|token| token.limit)
1177                .unwrap_or_else(|| total_rows.max(1)),
1178        )
1179    }
1180}
1181
1182fn encode_compact_rows(rows: &[StoredRow]) -> CompactRowsPayload {
1183    let mut columns = vec!["id".to_string()];
1184    for row in rows {
1185        for key in row.data.keys() {
1186            if !columns.iter().any(|column| column == key) {
1187                columns.push(key.clone());
1188            }
1189        }
1190    }
1191
1192    let mut encoded_rows = Vec::with_capacity(rows.len());
1193    for row in rows {
1194        let mut encoded = Vec::with_capacity(columns.len());
1195        for column in &columns {
1196            if column == "id" {
1197                encoded.push(Value::from(row.id));
1198            } else {
1199                encoded.push(row.data.get(column).cloned().unwrap_or(Value::Null));
1200            }
1201        }
1202        encoded_rows.push(encoded);
1203    }
1204
1205    CompactRowsPayload {
1206        columns,
1207        rows: encoded_rows,
1208    }
1209}
1210
1211fn now_epoch_ms() -> u64 {
1212    SystemTime::now()
1213        .duration_since(UNIX_EPOCH)
1214        .map(|duration| duration.as_millis() as u64)
1215        .unwrap_or_default()
1216}
1217
1218fn matches_filter(row: &StoredRow, filter: &crate::query::Filter) -> bool {
1219    let Some(candidate) = row.data.get(&filter.field) else {
1220        return false;
1221    };
1222    match filter.op {
1223        FilterOperator::Eq => candidate == &filter.value,
1224        FilterOperator::Neq => candidate != &filter.value,
1225        FilterOperator::Contains => candidate
1226            .as_str()
1227            .zip(filter.value.as_str())
1228            .is_some_and(|(left, right)| left.contains(right)),
1229        FilterOperator::Gt => {
1230            compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Greater)
1231        }
1232        FilterOperator::Gte => compare_numbers(candidate, &filter.value)
1233            .is_some_and(|ord| ord == Ordering::Greater || ord == Ordering::Equal),
1234        FilterOperator::Lt => {
1235            compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Less)
1236        }
1237        FilterOperator::Lte => compare_numbers(candidate, &filter.value)
1238            .is_some_and(|ord| ord == Ordering::Less || ord == Ordering::Equal),
1239    }
1240}
1241
1242fn compare_for_sort(left: &StoredRow, right: &StoredRow, field: &str) -> Ordering {
1243    let left_value = left.data.get(field);
1244    let right_value = right.data.get(field);
1245    match (left_value, right_value) {
1246        (Some(Value::Number(left_num)), Some(Value::Number(right_num))) => left_num
1247            .as_f64()
1248            .partial_cmp(&right_num.as_f64())
1249            .unwrap_or(Ordering::Equal),
1250        (Some(Value::String(left_text)), Some(Value::String(right_text))) => {
1251            left_text.cmp(right_text)
1252        }
1253        _ => left.id.cmp(&right.id),
1254    }
1255}
1256
1257fn compare_numbers(left: &Value, right: &Value) -> Option<Ordering> {
1258    left.as_f64()
1259        .zip(right.as_f64())
1260        .and_then(|(left, right)| left.partial_cmp(&right))
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265    use super::{
1266        adapter_for, DatabaseConfig, MemoryRepo, OptimisticLock, OptimisticLockingRepo,
1267        QueryObservabilityPolicy, QueryObservabilityTracker, Repo, RepoUnitOfWork, Row,
1268        TenantPolicyDecision, TenantRepoOperation, TenantScopedRepo,
1269    };
1270    use crate::{
1271        AdapterKind, DataError, Filter, FilterOperator, Query, QueryContext, SortDirection,
1272        WireFormatProfile,
1273    };
1274    use serde_json::json;
1275
1276    #[test]
1277    fn memory_repo_works_for_adapter_selection() {
1278        let mut repo = MemoryRepo::new(
1279            adapter_for(&DatabaseConfig {
1280                adapter: AdapterKind::Sqlite,
1281                url: None,
1282                url_env: None,
1283            })
1284            .unwrap(),
1285        );
1286        let mut row = Row::new();
1287        row.insert("title".to_string(), json!("Alpha"));
1288        row.insert("score".to_string(), json!(10));
1289        repo.insert("posts", row).unwrap();
1290
1291        let rows = repo
1292            .list(
1293                "posts",
1294                &Query::new()
1295                    .where_filter(Filter::contains("title", "Al"))
1296                    .order_by("score", SortDirection::Desc),
1297            )
1298            .unwrap();
1299        assert_eq!(rows.len(), 1);
1300        assert_eq!(rows[0].data.get("title"), Some(&json!("Alpha")));
1301    }
1302
1303    #[test]
1304    fn adapter_for_rejects_none_and_selects_expected_driver() {
1305        let none_result = adapter_for(&DatabaseConfig {
1306            adapter: AdapterKind::None,
1307            url: None,
1308            url_env: None,
1309        });
1310        assert!(matches!(none_result, Err(DataError::Adapter(_))));
1311
1312        for kind in [
1313            AdapterKind::Postgres,
1314            AdapterKind::MySql,
1315            AdapterKind::Sqlite,
1316            AdapterKind::SingleStore,
1317            AdapterKind::ClickHouse,
1318            AdapterKind::BigQuery,
1319            AdapterKind::OpenSearch,
1320        ] {
1321            let driver = adapter_for(&DatabaseConfig {
1322                adapter: kind,
1323                url: None,
1324                url_env: None,
1325            })
1326            .expect("driver should be created");
1327            assert_eq!(driver.kind(), kind);
1328        }
1329    }
1330
1331    #[test]
1332    fn update_delete_and_find_cover_missing_rows() {
1333        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1334
1335        let mut row = Row::new();
1336        row.insert("title".to_string(), json!("Draft"));
1337        let inserted = repo.insert("posts", row).expect("insert should work");
1338
1339        assert_eq!(
1340            repo.find("posts", inserted.id)
1341                .expect("find should not fail")
1342                .map(|it| it.id),
1343            Some(inserted.id)
1344        );
1345        assert!(repo
1346            .find("posts", 999)
1347            .expect("find should not fail")
1348            .is_none());
1349        assert!(repo
1350            .find("missing_table", inserted.id)
1351            .expect("find should not fail")
1352            .is_none());
1353
1354        let mut updated = Row::new();
1355        updated.insert("title".to_string(), json!("Published"));
1356        let updated_row = repo
1357            .update("posts", inserted.id, updated)
1358            .expect("update should work");
1359        assert_eq!(updated_row.data.get("title"), Some(&json!("Published")));
1360
1361        let update_err = repo
1362            .update("posts", 404, Row::new())
1363            .expect_err("missing row should fail update");
1364        assert!(matches!(update_err, DataError::Query(_)));
1365
1366        repo.delete("posts", inserted.id)
1367            .expect("delete should remove row");
1368        let delete_err = repo
1369            .delete("posts", inserted.id)
1370            .expect_err("deleting missing row should fail");
1371        assert!(matches!(delete_err, DataError::Query(_)));
1372    }
1373
1374    #[test]
1375    fn tenant_scoped_repo_enforces_row_level_isolation() {
1376        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1377        let mut scoped = TenantScopedRepo::new(repo);
1378
1379        let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1380        let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1381
1382        let mut first = Row::new();
1383        first.insert("name".to_string(), json!("Acme"));
1384        let inserted = scoped
1385            .insert_scoped("accounts", &tenant_a, first)
1386            .expect("insert tenant-a row");
1387
1388        let found = scoped
1389            .find_scoped("accounts", inserted.id, &tenant_a)
1390            .expect("find tenant-a row");
1391        assert!(found.is_some());
1392
1393        let denied = scoped
1394            .find_scoped("accounts", inserted.id, &tenant_b)
1395            .expect_err("cross-tenant find should be denied");
1396        assert!(denied.to_string().contains("tenant_row_mismatch"));
1397    }
1398
1399    #[test]
1400    fn tenant_scoped_repo_requires_tenant_context_by_default() {
1401        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1402        let mut scoped = TenantScopedRepo::new(repo);
1403
1404        let mut row = Row::new();
1405        row.insert("name".to_string(), json!("NoTenant"));
1406        let err = scoped
1407            .insert_scoped("accounts", &QueryContext::default(), row)
1408            .expect_err("missing tenant context should be denied");
1409        assert!(err.to_string().contains("tenant_context_required"));
1410    }
1411
1412    #[test]
1413    fn tenant_scoped_repo_policy_hook_can_reject_operations() {
1414        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1415        let mut scoped = TenantScopedRepo::new(repo).with_policy_hook(|ctx| {
1416            if ctx.operation == TenantRepoOperation::Delete {
1417                TenantPolicyDecision::deny("tenant_delete_denied", "tenant delete denied")
1418            } else {
1419                TenantPolicyDecision::allow()
1420            }
1421        });
1422
1423        let tenant = QueryContext::default().with_tenant_id("tenant-a");
1424        let mut row = Row::new();
1425        row.insert("name".to_string(), json!("Acme"));
1426        let inserted = scoped
1427            .insert_scoped("accounts", &tenant, row)
1428            .expect("insert tenant row");
1429
1430        let err = scoped
1431            .delete_scoped("accounts", inserted.id, &tenant)
1432            .expect_err("policy should reject delete");
1433        assert!(err.to_string().contains("tenant_delete_denied"));
1434    }
1435
1436    #[test]
1437    fn tenant_scoped_repo_update_and_window_list_enforce_scope() {
1438        let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1439        let mut scoped = TenantScopedRepo::new(repo);
1440
1441        let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
1442        let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
1443
1444        let mut first = Row::new();
1445        first.insert("name".to_string(), json!("Acme"));
1446        first.insert("score".to_string(), json!(10));
1447        let inserted = scoped
1448            .insert_scoped("accounts", &tenant_a, first)
1449            .expect("insert tenant-a row");
1450
1451        let mut update = Row::new();
1452        update.insert("name".to_string(), json!("Acme Prime"));
1453        update.insert("score".to_string(), json!(20));
1454        let updated = scoped
1455            .update_scoped("accounts", inserted.id, &tenant_a, update)
1456            .expect("update tenant-a row");
1457        assert_eq!(updated.data.get("tenant_id"), Some(&json!("tenant-a")));
1458
1459        let mut bad_update = Row::new();
1460        bad_update.insert("name".to_string(), json!("Cross"));
1461        bad_update.insert("tenant_id".to_string(), json!("tenant-b"));
1462        let err = scoped
1463            .update_scoped("accounts", inserted.id, &tenant_a, bad_update)
1464            .expect_err("tenant mismatch on row payload should fail");
1465        assert!(err.to_string().contains("tenant_row_mismatch"));
1466
1467        let list = scoped
1468            .list_scoped(
1469                "accounts",
1470                &Query::new().order_by("score", SortDirection::Asc),
1471                &tenant_a,
1472            )
1473            .expect("list scoped");
1474        assert_eq!(list.len(), 1);
1475
1476        let window = scoped
1477            .list_window_scoped(
1478                "accounts",
1479                &Query::new()
1480                    .order_by("score", SortDirection::Asc)
1481                    .window(1, 5, 2),
1482                &tenant_a,
1483            )
1484            .expect("list window scoped");
1485        assert_eq!(window.rows.len(), 1);
1486
1487        let denied = scoped
1488            .find_scoped("accounts", inserted.id, &tenant_b)
1489            .expect_err("cross-tenant find should still fail");
1490        assert!(denied.to_string().contains("tenant_row_mismatch"));
1491    }
1492
1493    #[test]
1494    fn list_applies_filters_sorts_and_pagination() {
1495        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1496
1497        let mut alpha = Row::new();
1498        alpha.insert("title".to_string(), json!("Alpha"));
1499        alpha.insert("score".to_string(), json!(10));
1500        alpha.insert("tag".to_string(), json!("core"));
1501        repo.insert("posts", alpha).expect("insert alpha");
1502
1503        let mut beta = Row::new();
1504        beta.insert("title".to_string(), json!("Beta"));
1505        beta.insert("score".to_string(), json!(20));
1506        beta.insert("tag".to_string(), json!("ops"));
1507        repo.insert("posts", beta).expect("insert beta");
1508
1509        let mut gamma = Row::new();
1510        gamma.insert("title".to_string(), json!("Gamma"));
1511        gamma.insert("score".to_string(), json!(15));
1512        gamma.insert("tag".to_string(), json!(123));
1513        repo.insert("posts", gamma).expect("insert gamma");
1514
1515        let eq_rows = repo
1516            .list(
1517                "posts",
1518                &Query::new().where_filter(Filter::eq("title", json!("Alpha"))),
1519            )
1520            .expect("eq filter");
1521        assert_eq!(eq_rows.len(), 1);
1522        assert_eq!(eq_rows[0].data.get("title"), Some(&json!("Alpha")));
1523
1524        let neq_rows = repo
1525            .list(
1526                "posts",
1527                &Query::new().where_filter(crate::Filter {
1528                    field: "title".to_string(),
1529                    op: FilterOperator::Neq,
1530                    value: json!("Alpha"),
1531                }),
1532            )
1533            .expect("neq filter");
1534        assert_eq!(neq_rows.len(), 2);
1535
1536        let contains_rows = repo
1537            .list(
1538                "posts",
1539                &Query::new().where_filter(Filter::contains("title", "mm")),
1540            )
1541            .expect("contains filter");
1542        assert_eq!(contains_rows.len(), 1);
1543        assert_eq!(contains_rows[0].data.get("title"), Some(&json!("Gamma")));
1544
1545        let contains_non_string_rows = repo
1546            .list(
1547                "posts",
1548                &Query::new().where_filter(Filter::contains("tag", "2")),
1549            )
1550            .expect("contains on mixed type");
1551        assert!(contains_non_string_rows.is_empty());
1552
1553        for (op, expected_titles) in [
1554            (FilterOperator::Gt, vec!["Beta"]),
1555            (FilterOperator::Gte, vec!["Beta", "Gamma"]),
1556            (FilterOperator::Lt, vec!["Alpha"]),
1557            (FilterOperator::Lte, vec!["Alpha", "Gamma"]),
1558        ] {
1559            let rows = repo
1560                .list(
1561                    "posts",
1562                    &Query::new().where_filter(crate::Filter {
1563                        field: "score".to_string(),
1564                        op,
1565                        value: json!(15),
1566                    }),
1567                )
1568                .expect("numeric filter");
1569            let titles: Vec<&str> = rows
1570                .iter()
1571                .map(|row| {
1572                    row.data
1573                        .get("title")
1574                        .and_then(|value| value.as_str())
1575                        .expect("title")
1576                })
1577                .collect();
1578            assert_eq!(titles, expected_titles);
1579        }
1580
1581        let unknown_field_sort = repo
1582            .list(
1583                "posts",
1584                &Query::new()
1585                    .order_by("missing", SortDirection::Desc)
1586                    .paginate(1, 2),
1587            )
1588            .expect("fallback sort");
1589        assert_eq!(unknown_field_sort.len(), 2);
1590        assert_eq!(unknown_field_sort[0].id, 3);
1591        assert_eq!(unknown_field_sort[1].id, 2);
1592
1593        let score_sort = repo
1594            .list(
1595                "posts",
1596                &Query::new()
1597                    .order_by("score", SortDirection::Desc)
1598                    .order_by("title", SortDirection::Asc),
1599            )
1600            .expect("score sort");
1601        let score_titles: Vec<&str> = score_sort
1602            .iter()
1603            .map(|row| {
1604                row.data
1605                    .get("title")
1606                    .and_then(|value| value.as_str())
1607                    .expect("title")
1608            })
1609            .collect();
1610        assert_eq!(score_titles, vec!["Beta", "Gamma", "Alpha"]);
1611    }
1612
1613    #[test]
1614    fn keyset_pagination_supports_forward_and_backward_windows() {
1615        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1616
1617        for score in 1..=6 {
1618            let mut row = Row::new();
1619            row.insert("title".to_string(), json!(format!("R{score}")));
1620            row.insert("score".to_string(), json!(score));
1621            repo.insert("scores", row).expect("insert score row");
1622        }
1623
1624        let forward_rows = repo
1625            .list(
1626                "scores",
1627                &Query::new()
1628                    .order_by("score", SortDirection::Asc)
1629                    .keyset_after("score", json!(2), 3),
1630            )
1631            .expect("keyset forward");
1632        let forward_scores: Vec<i64> = forward_rows
1633            .iter()
1634            .map(|row| {
1635                row.data
1636                    .get("score")
1637                    .and_then(|value| value.as_i64())
1638                    .expect("score")
1639            })
1640            .collect();
1641        assert_eq!(forward_scores, vec![3, 4, 5]);
1642
1643        let backward_rows = repo
1644            .list(
1645                "scores",
1646                &Query::new()
1647                    .order_by("score", SortDirection::Asc)
1648                    .keyset_before("score", json!(5), 2),
1649            )
1650            .expect("keyset backward");
1651        let backward_scores: Vec<i64> = backward_rows
1652            .iter()
1653            .map(|row| {
1654                row.data
1655                    .get("score")
1656                    .and_then(|value| value.as_i64())
1657                    .expect("score")
1658            })
1659            .collect();
1660        assert_eq!(backward_scores, vec![3, 4]);
1661    }
1662
1663    #[test]
1664    fn list_window_emits_tokens_and_compact_payload() {
1665        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1666
1667        for score in 0..300 {
1668            let mut row = Row::new();
1669            row.insert("score".to_string(), json!(score));
1670            row.insert(
1671                "tenant".to_string(),
1672                json!(if score % 2 == 0 { "a" } else { "b" }),
1673            );
1674            repo.insert("accounts", row).expect("insert account");
1675        }
1676
1677        let query = Query::new()
1678            .order_by("score", SortDirection::Asc)
1679            .window(100, 240, 20)
1680            .wire_format(WireFormatProfile::Compact);
1681        let page = repo.list_window("accounts", &query).expect("list window");
1682
1683        assert_eq!(page.total_rows, 300);
1684        assert_eq!(page.offset, 80);
1685        assert_eq!(page.limit, 180);
1686        assert_eq!(page.rows.len(), 180);
1687        assert_eq!(page.query_fingerprint, query.fingerprint());
1688        assert_eq!(page.token.query_fingerprint, query.fingerprint());
1689        let compact = page.compact_rows.expect("compact payload");
1690        assert!(compact.columns.contains(&"id".to_string()));
1691        assert!(compact.columns.contains(&"score".to_string()));
1692        assert_eq!(compact.rows.len(), page.rows.len());
1693    }
1694
1695    #[test]
1696    fn list_window_rejects_mismatched_query_token() {
1697        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1698        let mut row = Row::new();
1699        row.insert("score".to_string(), json!(1));
1700        repo.insert("accounts", row).expect("insert account");
1701
1702        let base = Query::new()
1703            .where_filter(Filter::eq("score", json!(1)))
1704            .paginate(1, 20);
1705        let stale_token = base.next_window_token(0, 20, 42, 1);
1706        let mismatched = Query::new()
1707            .where_filter(Filter::eq("score", json!(2)))
1708            .paginate(1, 20)
1709            .with_window_token(stale_token);
1710
1711        let err = repo
1712            .list_window("accounts", &mismatched)
1713            .expect_err("window token mismatch should fail");
1714        assert!(matches!(err, DataError::Query(_)));
1715        assert!(err.to_string().contains("window token"));
1716    }
1717
1718    #[test]
1719    fn incremental_diff_tracks_insert_update_and_remove() {
1720        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1721
1722        for (title, score) in [("A", 10), ("B", 20), ("C", 30)] {
1723            let mut row = Row::new();
1724            row.insert("title".to_string(), json!(title));
1725            row.insert("score".to_string(), json!(score));
1726            repo.insert("accounts", row).expect("insert");
1727        }
1728
1729        let query = Query::new()
1730            .order_by("score", SortDirection::Asc)
1731            .paginate(1, 10);
1732        let previous = repo.list_window("accounts", &query).expect("previous page");
1733
1734        let mut updated_row = Row::new();
1735        updated_row.insert("title".to_string(), json!("B"));
1736        updated_row.insert("score".to_string(), json!(25));
1737        repo.update("accounts", 2, updated_row).expect("update row");
1738        repo.delete("accounts", 3).expect("delete row");
1739        let mut inserted_row = Row::new();
1740        inserted_row.insert("title".to_string(), json!("D"));
1741        inserted_row.insert("score".to_string(), json!(35));
1742        repo.insert("accounts", inserted_row).expect("insert row");
1743
1744        let current = repo.list_window("accounts", &query).expect("current page");
1745        let diff = repo.materialize_incremental_diff(&previous, &current);
1746
1747        assert!(!diff.full_resync);
1748        assert_eq!(diff.inserted.len(), 1);
1749        assert_eq!(diff.inserted[0].id, 4);
1750        assert_eq!(diff.updated.len(), 1);
1751        assert_eq!(diff.updated[0].id, 2);
1752        assert_eq!(diff.removed_ids, vec![3]);
1753    }
1754
1755    #[test]
1756    fn unit_of_work_rolls_back_on_error() {
1757        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1758        let mut row = Row::new();
1759        row.insert("title".to_string(), json!("before"));
1760        let baseline = repo.insert("posts", row).expect("seed row");
1761        let original_count = repo
1762            .list("posts", &Query::new())
1763            .expect("list before")
1764            .len();
1765
1766        let result: Result<(), DataError> = repo.transaction(|inner| {
1767            let mut patch = Row::new();
1768            patch.insert("title".to_string(), json!("mutated"));
1769            inner.update("posts", baseline.id, patch)?;
1770            Err(DataError::Query("force rollback".to_string()))
1771        });
1772        assert!(result.is_err());
1773
1774        let after = repo
1775            .find("posts", baseline.id)
1776            .expect("find after rollback");
1777        assert_eq!(
1778            after
1779                .as_ref()
1780                .and_then(|row| row.data.get("title"))
1781                .and_then(|value| value.as_str()),
1782            Some("before")
1783        );
1784        assert_eq!(
1785            repo.list("posts", &Query::new()).expect("list after").len(),
1786            original_count
1787        );
1788    }
1789
1790    #[test]
1791    fn optimistic_lock_update_enforces_version_match_and_bumps_version() {
1792        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1793        let mut row = Row::new();
1794        row.insert("title".to_string(), json!("v1"));
1795        row.insert("lock_version".to_string(), json!(1u64));
1796        let inserted = repo.insert("posts", row).expect("seed row");
1797
1798        let mut patch = Row::new();
1799        patch.insert("title".to_string(), json!("v2"));
1800        let updated = repo
1801            .update_with_optimistic_lock(
1802                "posts",
1803                inserted.id,
1804                patch,
1805                OptimisticLock::new("lock_version", 1),
1806            )
1807            .expect("optimistic update should pass");
1808        assert_eq!(updated.data.get("title"), Some(&json!("v2")));
1809        assert_eq!(updated.data.get("lock_version"), Some(&json!(2u64)));
1810
1811        let err = repo
1812            .update_with_optimistic_lock(
1813                "posts",
1814                inserted.id,
1815                Row::new(),
1816                OptimisticLock::new("lock_version", 1),
1817            )
1818            .expect_err("stale version should fail");
1819        assert!(err.to_string().contains("optimistic lock conflict"));
1820    }
1821
1822    #[test]
1823    fn query_observability_tracker_surfaces_slow_and_n_plus_one_hints() {
1824        let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
1825        for index in 0..5 {
1826            let mut row = Row::new();
1827            row.insert("title".to_string(), json!(format!("R{index}")));
1828            repo.insert("posts", row).expect("insert");
1829        }
1830
1831        let query = Query::new().paginate(1, 2);
1832        let context = QueryContext::default()
1833            .with_correlation_id("corr-57")
1834            .with_request_id("req-57");
1835        let mut tracker = QueryObservabilityTracker::new(QueryObservabilityPolicy {
1836            slow_query_threshold_ms: 0,
1837            n_plus_one_window_size: 6,
1838            n_plus_one_repeat_threshold: 2,
1839        });
1840
1841        let (_, first) = repo
1842            .list_observed("posts", &query, &context, &mut tracker)
1843            .expect("first observed list");
1844        assert!(first.slow_query);
1845        assert!(!first.potential_n_plus_one);
1846        assert_eq!(first.correlation_id.as_deref(), Some("corr-57"));
1847        assert_eq!(first.request_id.as_deref(), Some("req-57"));
1848
1849        let (_, second) = repo
1850            .list_observed("posts", &query, &context, &mut tracker)
1851            .expect("second observed list");
1852        assert!(second.potential_n_plus_one);
1853        assert!(second
1854            .hints
1855            .iter()
1856            .any(|hint| hint.contains("possible N+1")));
1857
1858        let query_with_preload = query.clone().preload("author");
1859        let (_, third) = repo
1860            .list_observed("posts", &query_with_preload, &context, &mut tracker)
1861            .expect("observed list with preload");
1862        assert!(!third.potential_n_plus_one);
1863    }
1864}