use crate::{
adapter::{AdapterKind, DatabaseConfig},
error::{DataError, DataResult},
query::{
Filter, FilterOperator, KeysetDirection, Query, SortDirection, WindowToken,
WireFormatProfile,
},
QueryContext,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
sync::Arc,
time::{Instant, SystemTime, UNIX_EPOCH},
};
use tracing::{info, warn};
pub type Row = BTreeMap<String, Value>;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StoredRow {
pub id: u64,
pub data: Row,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CompactRowsPayload {
pub columns: Vec<String>,
pub rows: Vec<Vec<Value>>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct WindowPage {
pub offset: usize,
pub limit: usize,
pub total_rows: usize,
pub rows: Vec<StoredRow>,
pub query_fingerprint: String,
pub token: WindowToken,
pub wire_format: WireFormatProfile,
pub compact_rows: Option<CompactRowsPayload>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct IncrementalDiff {
pub from_token: Option<WindowToken>,
pub to_token: WindowToken,
pub full_resync: bool,
pub inserted: Vec<StoredRow>,
pub updated: Vec<StoredRow>,
pub removed_ids: Vec<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OptimisticLock {
pub field: String,
pub expected_version: u64,
pub increment_by: u64,
}
impl OptimisticLock {
pub fn new(field: impl Into<String>, expected_version: u64) -> Self {
Self {
field: field.into(),
expected_version,
increment_by: 1,
}
}
pub fn increment_by(mut self, increment_by: u64) -> Self {
self.increment_by = increment_by.max(1);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct QueryObservabilityPolicy {
pub slow_query_threshold_ms: u64,
pub n_plus_one_window_size: usize,
pub n_plus_one_repeat_threshold: usize,
}
impl Default for QueryObservabilityPolicy {
fn default() -> Self {
Self {
slow_query_threshold_ms: 250,
n_plus_one_window_size: 12,
n_plus_one_repeat_threshold: 3,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct QueryObservation {
pub table: String,
pub operation: String,
pub query_fingerprint: String,
pub duration_ms: u64,
pub slow_query: bool,
pub potential_n_plus_one: bool,
pub repeated_fingerprint_count: usize,
pub preloads: Vec<String>,
pub trace_id: Option<String>,
pub correlation_id: Option<String>,
pub request_id: Option<String>,
pub hints: Vec<String>,
}
#[derive(Debug, Default, Clone)]
pub struct QueryObservabilityTracker {
policy: QueryObservabilityPolicy,
recent_fingerprints: HashMap<String, Vec<String>>,
}
impl QueryObservabilityTracker {
pub fn new(policy: QueryObservabilityPolicy) -> Self {
Self {
policy,
recent_fingerprints: HashMap::new(),
}
}
pub fn policy(&self) -> &QueryObservabilityPolicy {
&self.policy
}
pub fn observe(
&mut self,
table: &str,
operation: &str,
query: &Query,
context: &QueryContext,
duration_ms: u64,
) -> QueryObservation {
let fingerprint = query.fingerprint();
let scope_key = observation_scope_key(table, operation, context);
let history = self.recent_fingerprints.entry(scope_key).or_default();
history.push(fingerprint.clone());
if history.len() > self.policy.n_plus_one_window_size {
let drain_count = history.len() - self.policy.n_plus_one_window_size;
history.drain(0..drain_count);
}
let repeated_fingerprint_count = history
.iter()
.filter(|value| *value == &fingerprint)
.count();
let potential_n_plus_one = query.preloads.is_empty()
&& repeated_fingerprint_count >= self.policy.n_plus_one_repeat_threshold;
let slow_query = duration_ms >= self.policy.slow_query_threshold_ms;
let mut hints = Vec::new();
if slow_query {
hints.push(format!(
"slow query ({}ms >= {}ms); consider indexes and narrower filters",
duration_ms, self.policy.slow_query_threshold_ms
));
}
if potential_n_plus_one {
hints.push(format!(
"repeated query fingerprint observed {} times without preloads; possible N+1",
repeated_fingerprint_count
));
}
QueryObservation {
table: table.to_string(),
operation: operation.to_string(),
query_fingerprint: fingerprint,
duration_ms,
slow_query,
potential_n_plus_one,
repeated_fingerprint_count,
preloads: query.preloads.clone(),
trace_id: context.trace_id.clone(),
correlation_id: context.correlation_id().map(ToString::to_string),
request_id: context.request_id().map(ToString::to_string),
hints,
}
}
}
pub trait AdapterDriver: Send + Sync {
fn kind(&self) -> AdapterKind;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct PostgresAdapter;
impl AdapterDriver for PostgresAdapter {
fn kind(&self) -> AdapterKind {
AdapterKind::Postgres
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct MySqlAdapter;
impl AdapterDriver for MySqlAdapter {
fn kind(&self) -> AdapterKind {
AdapterKind::MySql
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct SqliteAdapter;
impl AdapterDriver for SqliteAdapter {
fn kind(&self) -> AdapterKind {
AdapterKind::Sqlite
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct SingleStoreDriver;
impl AdapterDriver for SingleStoreDriver {
fn kind(&self) -> AdapterKind {
AdapterKind::SingleStore
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct ClickHouseDriver;
impl AdapterDriver for ClickHouseDriver {
fn kind(&self) -> AdapterKind {
AdapterKind::ClickHouse
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct BigQueryDriver;
impl AdapterDriver for BigQueryDriver {
fn kind(&self) -> AdapterKind {
AdapterKind::BigQuery
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct OpenSearchAdapterDriver;
impl AdapterDriver for OpenSearchAdapterDriver {
fn kind(&self) -> AdapterKind {
AdapterKind::OpenSearch
}
}
pub fn adapter_for(config: &DatabaseConfig) -> DataResult<Box<dyn AdapterDriver>> {
match config.adapter {
AdapterKind::Postgres => Ok(Box::new(PostgresAdapter)),
AdapterKind::MySql => Ok(Box::new(MySqlAdapter)),
AdapterKind::Sqlite => Ok(Box::new(SqliteAdapter)),
AdapterKind::SingleStore => Ok(Box::new(SingleStoreDriver)),
AdapterKind::ClickHouse => Ok(Box::new(ClickHouseDriver)),
AdapterKind::BigQuery => Ok(Box::new(BigQueryDriver)),
AdapterKind::OpenSearch => Ok(Box::new(OpenSearchAdapterDriver)),
AdapterKind::None => Err(DataError::Adapter(
"database adapter is `none`; select a supported backend in shelly.data.toml"
.to_string(),
)),
}
}
pub trait Repo {
fn adapter_kind(&self) -> AdapterKind;
fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow>;
fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow>;
fn delete(&mut self, table: &str, id: u64) -> DataResult<()>;
fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>>;
fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>>;
fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage>;
fn materialize_incremental_diff(
&self,
previous: &WindowPage,
current: &WindowPage,
) -> IncrementalDiff;
}
pub trait RepoUnitOfWork: Repo {
fn transaction<T, F>(&mut self, operation: F) -> DataResult<T>
where
F: FnOnce(&mut Self) -> DataResult<T>;
}
pub trait OptimisticLockingRepo: Repo {
fn update_with_optimistic_lock(
&mut self,
table: &str,
id: u64,
patch: Row,
lock: OptimisticLock,
) -> DataResult<StoredRow>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TenantRepoOperation {
Insert,
Update,
Delete,
Find,
List,
ListWindow,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TenantPolicyContext {
pub table: String,
pub operation: TenantRepoOperation,
pub tenant_id: Option<String>,
pub row_tenant_id: Option<String>,
pub row_id: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TenantPolicyDecision {
pub allowed: bool,
pub code: Option<String>,
pub message: Option<String>,
}
impl TenantPolicyDecision {
pub fn allow() -> Self {
Self {
allowed: true,
code: None,
message: None,
}
}
pub fn deny(code: impl Into<String>, message: impl Into<String>) -> Self {
Self {
allowed: false,
code: Some(code.into()),
message: Some(message.into()),
}
}
}
pub type TenantPolicyHook = Arc<dyn Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync>;
#[derive(Clone)]
pub struct TenantRepoConfig {
pub tenant_field: String,
pub require_tenant_context: bool,
pub policy_hook: Option<TenantPolicyHook>,
}
impl Default for TenantRepoConfig {
fn default() -> Self {
Self {
tenant_field: "tenant_id".to_string(),
require_tenant_context: true,
policy_hook: None,
}
}
}
pub struct TenantScopedRepo<R: Repo> {
inner: R,
config: TenantRepoConfig,
}
impl<R: Repo> TenantScopedRepo<R> {
pub fn new(inner: R) -> Self {
Self {
inner,
config: TenantRepoConfig::default(),
}
}
pub fn with_config(mut self, config: TenantRepoConfig) -> Self {
self.config = config;
self
}
pub fn with_policy_hook<F>(mut self, policy_hook: F) -> Self
where
F: Fn(&TenantPolicyContext) -> TenantPolicyDecision + Send + Sync + 'static,
{
self.config.policy_hook = Some(Arc::new(policy_hook));
self
}
pub fn inner(&self) -> &R {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut R {
&mut self.inner
}
pub fn into_inner(self) -> R {
self.inner
}
pub fn insert_scoped(
&mut self,
table: &str,
context: &QueryContext,
mut data: Row,
) -> DataResult<StoredRow> {
let tenant_id = self.required_tenant_id(context)?;
let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
self.ensure_tenant_match(
table,
TenantRepoOperation::Insert,
tenant_id.as_deref(),
row_tenant_id.as_deref(),
None,
)?;
if let Some(tenant_id) = tenant_id.as_deref() {
data.insert(
self.config.tenant_field.clone(),
Value::String(tenant_id.to_string()),
);
}
self.inner.insert(table, data)
}
pub fn update_scoped(
&mut self,
table: &str,
id: u64,
context: &QueryContext,
mut data: Row,
) -> DataResult<StoredRow> {
let tenant_id = self.required_tenant_id(context)?;
let existing = self.inner.find(table, id)?;
if let Some(existing) = existing.as_ref() {
self.ensure_tenant_match(
table,
TenantRepoOperation::Update,
tenant_id.as_deref(),
tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
Some(id),
)?;
}
let row_tenant_id = tenant_id_from_row(&data, self.config.tenant_field.as_str());
self.ensure_tenant_match(
table,
TenantRepoOperation::Update,
tenant_id.as_deref(),
row_tenant_id.as_deref(),
Some(id),
)?;
if let Some(tenant_id) = tenant_id.as_deref() {
data.insert(
self.config.tenant_field.clone(),
Value::String(tenant_id.to_string()),
);
}
self.inner.update(table, id, data)
}
pub fn delete_scoped(
&mut self,
table: &str,
id: u64,
context: &QueryContext,
) -> DataResult<()> {
let tenant_id = self.required_tenant_id(context)?;
let existing = self.inner.find(table, id)?;
if let Some(existing) = existing.as_ref() {
self.ensure_tenant_match(
table,
TenantRepoOperation::Delete,
tenant_id.as_deref(),
tenant_id_from_row(&existing.data, self.config.tenant_field.as_str()).as_deref(),
Some(id),
)?;
}
self.inner.delete(table, id)
}
pub fn find_scoped(
&self,
table: &str,
id: u64,
context: &QueryContext,
) -> DataResult<Option<StoredRow>> {
let tenant_id = self.required_tenant_id(context)?;
let row = self.inner.find(table, id)?;
if let Some(row) = row.as_ref() {
self.ensure_tenant_match(
table,
TenantRepoOperation::Find,
tenant_id.as_deref(),
tenant_id_from_row(&row.data, self.config.tenant_field.as_str()).as_deref(),
Some(id),
)?;
}
Ok(row)
}
pub fn list_scoped(
&self,
table: &str,
query: &Query,
context: &QueryContext,
) -> DataResult<Vec<StoredRow>> {
let tenant_id = self.required_tenant_id(context)?;
self.evaluate_policy(TenantPolicyContext {
table: table.to_string(),
operation: TenantRepoOperation::List,
tenant_id: tenant_id.clone(),
row_tenant_id: None,
row_id: None,
})?;
let scoped_query = self.scoped_query(query, tenant_id.as_deref());
self.inner.list(table, &scoped_query)
}
pub fn list_window_scoped(
&self,
table: &str,
query: &Query,
context: &QueryContext,
) -> DataResult<WindowPage> {
let tenant_id = self.required_tenant_id(context)?;
self.evaluate_policy(TenantPolicyContext {
table: table.to_string(),
operation: TenantRepoOperation::ListWindow,
tenant_id: tenant_id.clone(),
row_tenant_id: None,
row_id: None,
})?;
let scoped_query = self.scoped_query(query, tenant_id.as_deref());
self.inner.list_window(table, &scoped_query)
}
fn scoped_query(&self, query: &Query, tenant_id: Option<&str>) -> Query {
let mut scoped = query.clone();
if let Some(tenant_id) = tenant_id {
scoped.filters.push(Filter::eq(
self.config.tenant_field.clone(),
Value::String(tenant_id.to_string()),
));
}
scoped
}
fn required_tenant_id(&self, context: &QueryContext) -> DataResult<Option<String>> {
let tenant_id = normalize_tenant_id(context.tenant_id.as_deref());
if self.config.require_tenant_context && tenant_id.is_none() {
return Err(tenant_error(
"tenant_context_required",
"tenant context is required for scoped data operation",
));
}
Ok(tenant_id)
}
fn ensure_tenant_match(
&self,
table: &str,
operation: TenantRepoOperation,
tenant_id: Option<&str>,
row_tenant_id: Option<&str>,
row_id: Option<u64>,
) -> DataResult<()> {
let normalized_tenant_id = normalize_tenant_id(tenant_id);
let normalized_row_tenant_id = normalize_tenant_id(row_tenant_id);
if let Some(expected_tenant) = normalized_tenant_id.as_deref() {
if let Some(row_tenant) = normalized_row_tenant_id.as_deref() {
if row_tenant != expected_tenant {
return Err(tenant_error(
"tenant_row_mismatch",
format!(
"row tenant does not match tenant context for table `{table}` (row_id={row_id:?})"
),
));
}
}
}
self.evaluate_policy(TenantPolicyContext {
table: table.to_string(),
operation,
tenant_id: normalized_tenant_id,
row_tenant_id: normalized_row_tenant_id,
row_id,
})
}
fn evaluate_policy(&self, context: TenantPolicyContext) -> DataResult<()> {
let Some(policy_hook) = self.config.policy_hook.as_ref() else {
return Ok(());
};
let decision = policy_hook(&context);
if decision.allowed {
Ok(())
} else {
Err(tenant_error(
decision.code.as_deref().unwrap_or("tenant_policy_denied"),
decision
.message
.as_deref()
.unwrap_or("tenant policy denied data operation"),
))
}
}
}
fn tenant_error(code: &str, message: impl Into<String>) -> DataError {
DataError::Query(format!("[{code}] {}", message.into()))
}
fn observation_scope_key(table: &str, operation: &str, context: &QueryContext) -> String {
let correlation_scope = context
.correlation_id()
.or_else(|| context.request_id())
.or(context.trace_id.as_deref())
.unwrap_or("global");
format!("{table}:{operation}:{correlation_scope}")
}
fn normalize_tenant_id(tenant_id: Option<&str>) -> Option<String> {
tenant_id
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToString::to_string)
}
fn tenant_id_from_row(row: &Row, tenant_field: &str) -> Option<String> {
row.get(tenant_field)
.and_then(Value::as_str)
.and_then(|value| normalize_tenant_id(Some(value)))
}
pub struct MemoryRepo {
driver: Box<dyn AdapterDriver>,
tables: BTreeMap<String, Vec<StoredRow>>,
next_id: u64,
}
impl MemoryRepo {
pub fn new(driver: Box<dyn AdapterDriver>) -> Self {
Self {
driver,
tables: BTreeMap::new(),
next_id: 1,
}
}
pub fn list_observed(
&self,
table: &str,
query: &Query,
context: &QueryContext,
tracker: &mut QueryObservabilityTracker,
) -> DataResult<(Vec<StoredRow>, QueryObservation)> {
let started_at = Instant::now();
let rows = self.list(table, query)?;
let observation = tracker.observe(
table,
"list",
query,
context,
started_at.elapsed().as_millis() as u64,
);
Ok((rows, observation))
}
pub fn list_window_observed(
&self,
table: &str,
query: &Query,
context: &QueryContext,
tracker: &mut QueryObservabilityTracker,
) -> DataResult<(WindowPage, QueryObservation)> {
let started_at = Instant::now();
let page = self.list_window(table, query)?;
let observation = tracker.observe(
table,
"list_window",
query,
context,
started_at.elapsed().as_millis() as u64,
);
Ok((page, observation))
}
}
impl Repo for MemoryRepo {
fn adapter_kind(&self) -> AdapterKind {
self.driver.kind()
}
fn insert(&mut self, table: &str, data: Row) -> DataResult<StoredRow> {
let started_at = Instant::now();
let result = {
let entry = self.tables.entry(table.to_string()).or_default();
let row = StoredRow {
id: self.next_id,
data,
};
self.next_id += 1;
entry.push(row.clone());
Ok(row)
};
match &result {
Ok(row) => info!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "insert",
table,
row_id = row.id,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly data query executed"
),
Err(err) => warn!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "insert",
table,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly data query failed"
),
}
result
}
fn update(&mut self, table: &str, id: u64, data: Row) -> DataResult<StoredRow> {
let started_at = Instant::now();
let result = {
let rows = self.tables.entry(table.to_string()).or_default();
match rows.iter_mut().find(|row| row.id == id) {
Some(existing) => {
existing.data = data;
Ok(existing.clone())
}
None => Err(DataError::Query(format!(
"row id {id} not found in table `{table}`"
))),
}
};
match &result {
Ok(row) => info!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "update",
table,
row_id = row.id,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly data query executed"
),
Err(err) => warn!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "update",
table,
row_id = id,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly data query failed"
),
}
result
}
fn delete(&mut self, table: &str, id: u64) -> DataResult<()> {
let started_at = Instant::now();
let result = {
let rows = self.tables.entry(table.to_string()).or_default();
let initial_len = rows.len();
rows.retain(|row| row.id != id);
if rows.len() == initial_len {
Err(DataError::Query(format!(
"row id {id} not found in table `{table}`"
)))
} else {
Ok(())
}
};
match &result {
Ok(()) => info!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "delete",
table,
row_id = id,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly data query executed"
),
Err(err) => warn!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "delete",
table,
row_id = id,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly data query failed"
),
}
result
}
fn find(&self, table: &str, id: u64) -> DataResult<Option<StoredRow>> {
let started_at = Instant::now();
let result = Ok(self
.tables
.get(table)
.and_then(|rows| rows.iter().find(|row| row.id == id))
.cloned());
match &result {
Ok(row) => info!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "find",
table,
row_id = id,
found = row.is_some(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly data query executed"
),
Err(err) => warn!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "find",
table,
row_id = id,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly data query failed"
),
}
result
}
fn list(&self, table: &str, query: &Query) -> DataResult<Vec<StoredRow>> {
let started_at = Instant::now();
let result = {
let rows = self.tables.get(table).cloned().unwrap_or_default();
let rows = materialize_rows(rows, query);
Ok(rows)
};
match &result {
Ok(rows) => info!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "list",
table,
row_count = rows.len(),
filter_count = query.filters.len(),
sort_count = query.sorts.len(),
page = query.pagination.map(|value| value.page),
per_page = query.pagination.map(|value| value.per_page),
keyset_limit = query.keyset.as_ref().map(|value| value.limit),
wire_format = ?query.wire_format,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly data query executed"
),
Err(err) => warn!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "list",
table,
filter_count = query.filters.len(),
sort_count = query.sorts.len(),
page = query.pagination.map(|value| value.page),
per_page = query.pagination.map(|value| value.per_page),
keyset_limit = query.keyset.as_ref().map(|value| value.limit),
wire_format = ?query.wire_format,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly data query failed"
),
}
result
}
fn list_window(&self, table: &str, query: &Query) -> DataResult<WindowPage> {
if !query.has_valid_window_token() {
return Err(DataError::Query(
"window token query fingerprint mismatch".to_string(),
));
}
let started_at = Instant::now();
let all_rows = materialize_rows(self.tables.get(table).cloned().unwrap_or_default(), query);
let total_rows = all_rows.len();
let (offset, limit) = window_range_from_query(query, total_rows);
let rows = all_rows
.into_iter()
.skip(offset)
.take(limit.max(1))
.collect::<Vec<_>>();
let token = query.next_window_token(offset, limit.max(1), now_epoch_ms(), {
(offset as u64)
^ (limit as u64)
^ (rows.len() as u64)
^ (total_rows as u64)
^ self.next_id
});
let compact_rows =
(query.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
let page = WindowPage {
offset,
limit: limit.max(1),
total_rows,
rows,
query_fingerprint: query.fingerprint(),
token,
wire_format: query.wire_format,
compact_rows,
};
info!(
target: "shelly.data.query",
source = "memory_repo",
adapter = self.driver.kind().as_str(),
operation = "list_window",
table,
row_count = page.rows.len(),
total_rows = page.total_rows,
offset = page.offset,
limit = page.limit,
wire_format = ?page.wire_format,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly data query executed"
);
Ok(page)
}
fn materialize_incremental_diff(
&self,
previous: &WindowPage,
current: &WindowPage,
) -> IncrementalDiff {
let full_resync = previous.query_fingerprint != current.query_fingerprint
|| previous.wire_format != current.wire_format;
if full_resync {
return IncrementalDiff {
from_token: Some(previous.token.clone()),
to_token: current.token.clone(),
full_resync: true,
inserted: current.rows.clone(),
updated: Vec::new(),
removed_ids: Vec::new(),
};
}
let previous_rows: HashMap<u64, &StoredRow> =
previous.rows.iter().map(|row| (row.id, row)).collect();
let current_rows: HashMap<u64, &StoredRow> =
current.rows.iter().map(|row| (row.id, row)).collect();
let mut inserted = Vec::new();
let mut updated = Vec::new();
for row in ¤t.rows {
match previous_rows.get(&row.id) {
None => inserted.push(row.clone()),
Some(previous_row) if previous_row.data != row.data => updated.push(row.clone()),
_ => {}
}
}
let mut removed_ids = previous_rows
.keys()
.filter(|id| !current_rows.contains_key(id))
.copied()
.collect::<Vec<_>>();
removed_ids.sort_unstable();
IncrementalDiff {
from_token: Some(previous.token.clone()),
to_token: current.token.clone(),
full_resync: false,
inserted,
updated,
removed_ids,
}
}
}
impl RepoUnitOfWork for MemoryRepo {
fn transaction<T, F>(&mut self, operation: F) -> DataResult<T>
where
F: FnOnce(&mut Self) -> DataResult<T>,
{
let snapshot_tables = self.tables.clone();
let snapshot_next_id = self.next_id;
match operation(self) {
Ok(value) => Ok(value),
Err(err) => {
self.tables = snapshot_tables;
self.next_id = snapshot_next_id;
Err(err)
}
}
}
}
impl OptimisticLockingRepo for MemoryRepo {
fn update_with_optimistic_lock(
&mut self,
table: &str,
id: u64,
patch: Row,
lock: OptimisticLock,
) -> DataResult<StoredRow> {
let existing = self.find(table, id)?.ok_or_else(|| {
DataError::Query(format!(
"row id {id} not found in table `{table}` for optimistic lock update"
))
})?;
let current_version = existing
.data
.get(lock.field.as_str())
.and_then(Value::as_u64)
.ok_or_else(|| {
DataError::Query(format!(
"row id {id} in `{table}` missing optimistic lock field `{}`",
lock.field
))
})?;
if current_version != lock.expected_version {
return Err(DataError::Query(format!(
"optimistic lock conflict on `{table}` row id {id}: expected {} but found {}",
lock.expected_version, current_version
)));
}
let next_version = current_version
.checked_add(lock.increment_by)
.ok_or_else(|| {
DataError::Query(format!(
"optimistic lock version overflow on `{table}` row id {id}"
))
})?;
let mut merged = existing.data;
for (key, value) in patch {
merged.insert(key, value);
}
merged.insert(lock.field, Value::from(next_version));
self.update(table, id, merged)
}
}
fn materialize_rows(mut rows: Vec<StoredRow>, query: &Query) -> Vec<StoredRow> {
if !query.filters.is_empty() {
rows.retain(|row| {
query
.filters
.iter()
.all(|filter| matches_filter(row, filter))
});
}
for sort in query.sorts.iter().rev() {
rows.sort_by(|left, right| compare_for_sort(left, right, sort.field.as_str()));
if sort.direction == SortDirection::Desc {
rows.reverse();
}
}
if let Some(keyset) = &query.keyset {
rows = apply_keyset(rows, keyset);
} else if let Some(pagination) = query.pagination {
let offset = (pagination.page.saturating_sub(1)) * pagination.per_page;
rows = rows
.into_iter()
.skip(offset)
.take(pagination.per_page)
.collect();
}
rows
}
fn apply_keyset(rows: Vec<StoredRow>, keyset: &crate::query::KeysetPagination) -> Vec<StoredRow> {
let limit = keyset.limit.max(1);
let Some(cursor) = keyset.cursor.as_ref() else {
return rows.into_iter().take(limit).collect();
};
match cursor.direction {
KeysetDirection::Forward => rows
.into_iter()
.filter(|row| compare_row_cursor(row, cursor) == Ordering::Greater)
.take(limit)
.collect(),
KeysetDirection::Backward => {
let mut filtered = rows
.into_iter()
.filter(|row| compare_row_cursor(row, cursor) == Ordering::Less)
.collect::<Vec<_>>();
let keep_from = filtered.len().saturating_sub(limit);
filtered.drain(0..keep_from);
filtered
}
}
}
fn compare_row_cursor(row: &StoredRow, cursor: &crate::query::KeysetCursor) -> Ordering {
row.data
.get(&cursor.field)
.map(|value| compare_json_values(value, &cursor.value))
.unwrap_or_else(|| row.id.cmp(&cursor.value.as_u64().unwrap_or_default()))
}
fn compare_json_values(left: &Value, right: &Value) -> Ordering {
match (left, right) {
(Value::Number(_), Value::Number(_)) => {
compare_numbers(left, right).unwrap_or(Ordering::Equal)
}
(Value::String(left), Value::String(right)) => left.cmp(right),
(Value::Bool(left), Value::Bool(right)) => left.cmp(right),
_ => left.to_string().cmp(&right.to_string()),
}
}
fn window_range_from_query(query: &Query, total_rows: usize) -> (usize, usize) {
if let Some(window) = query.window {
let span = window.span().max(1);
let overscan = window.overscan;
let start = window.start.saturating_sub(overscan);
let limit = span.saturating_add(overscan.saturating_mul(2)).max(1);
if total_rows == 0 {
(0, limit)
} else {
(start.min(total_rows.saturating_sub(1)), limit)
}
} else if let Some(pagination) = query.pagination {
(
(pagination.page.saturating_sub(1)) * pagination.per_page,
pagination.per_page.max(1),
)
} else if let Some(keyset) = &query.keyset {
(
query
.window_token
.as_ref()
.map(|token| token.offset)
.unwrap_or(0),
keyset.limit.max(1),
)
} else {
(
query
.window_token
.as_ref()
.map(|token| token.offset)
.unwrap_or(0),
query
.window_token
.as_ref()
.map(|token| token.limit)
.unwrap_or_else(|| total_rows.max(1)),
)
}
}
fn encode_compact_rows(rows: &[StoredRow]) -> CompactRowsPayload {
let mut columns = vec!["id".to_string()];
for row in rows {
for key in row.data.keys() {
if !columns.iter().any(|column| column == key) {
columns.push(key.clone());
}
}
}
let mut encoded_rows = Vec::with_capacity(rows.len());
for row in rows {
let mut encoded = Vec::with_capacity(columns.len());
for column in &columns {
if column == "id" {
encoded.push(Value::from(row.id));
} else {
encoded.push(row.data.get(column).cloned().unwrap_or(Value::Null));
}
}
encoded_rows.push(encoded);
}
CompactRowsPayload {
columns,
rows: encoded_rows,
}
}
fn now_epoch_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.unwrap_or_default()
}
fn matches_filter(row: &StoredRow, filter: &crate::query::Filter) -> bool {
let Some(candidate) = row.data.get(&filter.field) else {
return false;
};
match filter.op {
FilterOperator::Eq => candidate == &filter.value,
FilterOperator::Neq => candidate != &filter.value,
FilterOperator::Contains => candidate
.as_str()
.zip(filter.value.as_str())
.is_some_and(|(left, right)| left.contains(right)),
FilterOperator::Gt => {
compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Greater)
}
FilterOperator::Gte => compare_numbers(candidate, &filter.value)
.is_some_and(|ord| ord == Ordering::Greater || ord == Ordering::Equal),
FilterOperator::Lt => {
compare_numbers(candidate, &filter.value).is_some_and(|ord| ord == Ordering::Less)
}
FilterOperator::Lte => compare_numbers(candidate, &filter.value)
.is_some_and(|ord| ord == Ordering::Less || ord == Ordering::Equal),
}
}
fn compare_for_sort(left: &StoredRow, right: &StoredRow, field: &str) -> Ordering {
let left_value = left.data.get(field);
let right_value = right.data.get(field);
match (left_value, right_value) {
(Some(Value::Number(left_num)), Some(Value::Number(right_num))) => left_num
.as_f64()
.partial_cmp(&right_num.as_f64())
.unwrap_or(Ordering::Equal),
(Some(Value::String(left_text)), Some(Value::String(right_text))) => {
left_text.cmp(right_text)
}
_ => left.id.cmp(&right.id),
}
}
fn compare_numbers(left: &Value, right: &Value) -> Option<Ordering> {
left.as_f64()
.zip(right.as_f64())
.and_then(|(left, right)| left.partial_cmp(&right))
}
#[cfg(test)]
mod tests {
use super::{
adapter_for, DatabaseConfig, MemoryRepo, OptimisticLock, OptimisticLockingRepo,
QueryObservabilityPolicy, QueryObservabilityTracker, Repo, RepoUnitOfWork, Row,
TenantPolicyDecision, TenantRepoOperation, TenantScopedRepo,
};
use crate::{
AdapterKind, DataError, Filter, FilterOperator, Query, QueryContext, SortDirection,
WireFormatProfile,
};
use serde_json::json;
#[test]
fn memory_repo_works_for_adapter_selection() {
let mut repo = MemoryRepo::new(
adapter_for(&DatabaseConfig {
adapter: AdapterKind::Sqlite,
url: None,
url_env: None,
})
.unwrap(),
);
let mut row = Row::new();
row.insert("title".to_string(), json!("Alpha"));
row.insert("score".to_string(), json!(10));
repo.insert("posts", row).unwrap();
let rows = repo
.list(
"posts",
&Query::new()
.where_filter(Filter::contains("title", "Al"))
.order_by("score", SortDirection::Desc),
)
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].data.get("title"), Some(&json!("Alpha")));
}
#[test]
fn adapter_for_rejects_none_and_selects_expected_driver() {
let none_result = adapter_for(&DatabaseConfig {
adapter: AdapterKind::None,
url: None,
url_env: None,
});
assert!(matches!(none_result, Err(DataError::Adapter(_))));
for kind in [
AdapterKind::Postgres,
AdapterKind::MySql,
AdapterKind::Sqlite,
AdapterKind::SingleStore,
AdapterKind::ClickHouse,
AdapterKind::BigQuery,
AdapterKind::OpenSearch,
] {
let driver = adapter_for(&DatabaseConfig {
adapter: kind,
url: None,
url_env: None,
})
.expect("driver should be created");
assert_eq!(driver.kind(), kind);
}
}
#[test]
fn update_delete_and_find_cover_missing_rows() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut row = Row::new();
row.insert("title".to_string(), json!("Draft"));
let inserted = repo.insert("posts", row).expect("insert should work");
assert_eq!(
repo.find("posts", inserted.id)
.expect("find should not fail")
.map(|it| it.id),
Some(inserted.id)
);
assert!(repo
.find("posts", 999)
.expect("find should not fail")
.is_none());
assert!(repo
.find("missing_table", inserted.id)
.expect("find should not fail")
.is_none());
let mut updated = Row::new();
updated.insert("title".to_string(), json!("Published"));
let updated_row = repo
.update("posts", inserted.id, updated)
.expect("update should work");
assert_eq!(updated_row.data.get("title"), Some(&json!("Published")));
let update_err = repo
.update("posts", 404, Row::new())
.expect_err("missing row should fail update");
assert!(matches!(update_err, DataError::Query(_)));
repo.delete("posts", inserted.id)
.expect("delete should remove row");
let delete_err = repo
.delete("posts", inserted.id)
.expect_err("deleting missing row should fail");
assert!(matches!(delete_err, DataError::Query(_)));
}
#[test]
fn tenant_scoped_repo_enforces_row_level_isolation() {
let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut scoped = TenantScopedRepo::new(repo);
let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
let mut first = Row::new();
first.insert("name".to_string(), json!("Acme"));
let inserted = scoped
.insert_scoped("accounts", &tenant_a, first)
.expect("insert tenant-a row");
let found = scoped
.find_scoped("accounts", inserted.id, &tenant_a)
.expect("find tenant-a row");
assert!(found.is_some());
let denied = scoped
.find_scoped("accounts", inserted.id, &tenant_b)
.expect_err("cross-tenant find should be denied");
assert!(denied.to_string().contains("tenant_row_mismatch"));
}
#[test]
fn tenant_scoped_repo_requires_tenant_context_by_default() {
let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut scoped = TenantScopedRepo::new(repo);
let mut row = Row::new();
row.insert("name".to_string(), json!("NoTenant"));
let err = scoped
.insert_scoped("accounts", &QueryContext::default(), row)
.expect_err("missing tenant context should be denied");
assert!(err.to_string().contains("tenant_context_required"));
}
#[test]
fn tenant_scoped_repo_policy_hook_can_reject_operations() {
let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut scoped = TenantScopedRepo::new(repo).with_policy_hook(|ctx| {
if ctx.operation == TenantRepoOperation::Delete {
TenantPolicyDecision::deny("tenant_delete_denied", "tenant delete denied")
} else {
TenantPolicyDecision::allow()
}
});
let tenant = QueryContext::default().with_tenant_id("tenant-a");
let mut row = Row::new();
row.insert("name".to_string(), json!("Acme"));
let inserted = scoped
.insert_scoped("accounts", &tenant, row)
.expect("insert tenant row");
let err = scoped
.delete_scoped("accounts", inserted.id, &tenant)
.expect_err("policy should reject delete");
assert!(err.to_string().contains("tenant_delete_denied"));
}
#[test]
fn tenant_scoped_repo_update_and_window_list_enforce_scope() {
let repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut scoped = TenantScopedRepo::new(repo);
let tenant_a = QueryContext::default().with_tenant_id("tenant-a");
let tenant_b = QueryContext::default().with_tenant_id("tenant-b");
let mut first = Row::new();
first.insert("name".to_string(), json!("Acme"));
first.insert("score".to_string(), json!(10));
let inserted = scoped
.insert_scoped("accounts", &tenant_a, first)
.expect("insert tenant-a row");
let mut update = Row::new();
update.insert("name".to_string(), json!("Acme Prime"));
update.insert("score".to_string(), json!(20));
let updated = scoped
.update_scoped("accounts", inserted.id, &tenant_a, update)
.expect("update tenant-a row");
assert_eq!(updated.data.get("tenant_id"), Some(&json!("tenant-a")));
let mut bad_update = Row::new();
bad_update.insert("name".to_string(), json!("Cross"));
bad_update.insert("tenant_id".to_string(), json!("tenant-b"));
let err = scoped
.update_scoped("accounts", inserted.id, &tenant_a, bad_update)
.expect_err("tenant mismatch on row payload should fail");
assert!(err.to_string().contains("tenant_row_mismatch"));
let list = scoped
.list_scoped(
"accounts",
&Query::new().order_by("score", SortDirection::Asc),
&tenant_a,
)
.expect("list scoped");
assert_eq!(list.len(), 1);
let window = scoped
.list_window_scoped(
"accounts",
&Query::new()
.order_by("score", SortDirection::Asc)
.window(1, 5, 2),
&tenant_a,
)
.expect("list window scoped");
assert_eq!(window.rows.len(), 1);
let denied = scoped
.find_scoped("accounts", inserted.id, &tenant_b)
.expect_err("cross-tenant find should still fail");
assert!(denied.to_string().contains("tenant_row_mismatch"));
}
#[test]
fn list_applies_filters_sorts_and_pagination() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut alpha = Row::new();
alpha.insert("title".to_string(), json!("Alpha"));
alpha.insert("score".to_string(), json!(10));
alpha.insert("tag".to_string(), json!("core"));
repo.insert("posts", alpha).expect("insert alpha");
let mut beta = Row::new();
beta.insert("title".to_string(), json!("Beta"));
beta.insert("score".to_string(), json!(20));
beta.insert("tag".to_string(), json!("ops"));
repo.insert("posts", beta).expect("insert beta");
let mut gamma = Row::new();
gamma.insert("title".to_string(), json!("Gamma"));
gamma.insert("score".to_string(), json!(15));
gamma.insert("tag".to_string(), json!(123));
repo.insert("posts", gamma).expect("insert gamma");
let eq_rows = repo
.list(
"posts",
&Query::new().where_filter(Filter::eq("title", json!("Alpha"))),
)
.expect("eq filter");
assert_eq!(eq_rows.len(), 1);
assert_eq!(eq_rows[0].data.get("title"), Some(&json!("Alpha")));
let neq_rows = repo
.list(
"posts",
&Query::new().where_filter(crate::Filter {
field: "title".to_string(),
op: FilterOperator::Neq,
value: json!("Alpha"),
}),
)
.expect("neq filter");
assert_eq!(neq_rows.len(), 2);
let contains_rows = repo
.list(
"posts",
&Query::new().where_filter(Filter::contains("title", "mm")),
)
.expect("contains filter");
assert_eq!(contains_rows.len(), 1);
assert_eq!(contains_rows[0].data.get("title"), Some(&json!("Gamma")));
let contains_non_string_rows = repo
.list(
"posts",
&Query::new().where_filter(Filter::contains("tag", "2")),
)
.expect("contains on mixed type");
assert!(contains_non_string_rows.is_empty());
for (op, expected_titles) in [
(FilterOperator::Gt, vec!["Beta"]),
(FilterOperator::Gte, vec!["Beta", "Gamma"]),
(FilterOperator::Lt, vec!["Alpha"]),
(FilterOperator::Lte, vec!["Alpha", "Gamma"]),
] {
let rows = repo
.list(
"posts",
&Query::new().where_filter(crate::Filter {
field: "score".to_string(),
op,
value: json!(15),
}),
)
.expect("numeric filter");
let titles: Vec<&str> = rows
.iter()
.map(|row| {
row.data
.get("title")
.and_then(|value| value.as_str())
.expect("title")
})
.collect();
assert_eq!(titles, expected_titles);
}
let unknown_field_sort = repo
.list(
"posts",
&Query::new()
.order_by("missing", SortDirection::Desc)
.paginate(1, 2),
)
.expect("fallback sort");
assert_eq!(unknown_field_sort.len(), 2);
assert_eq!(unknown_field_sort[0].id, 3);
assert_eq!(unknown_field_sort[1].id, 2);
let score_sort = repo
.list(
"posts",
&Query::new()
.order_by("score", SortDirection::Desc)
.order_by("title", SortDirection::Asc),
)
.expect("score sort");
let score_titles: Vec<&str> = score_sort
.iter()
.map(|row| {
row.data
.get("title")
.and_then(|value| value.as_str())
.expect("title")
})
.collect();
assert_eq!(score_titles, vec!["Beta", "Gamma", "Alpha"]);
}
#[test]
fn keyset_pagination_supports_forward_and_backward_windows() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
for score in 1..=6 {
let mut row = Row::new();
row.insert("title".to_string(), json!(format!("R{score}")));
row.insert("score".to_string(), json!(score));
repo.insert("scores", row).expect("insert score row");
}
let forward_rows = repo
.list(
"scores",
&Query::new()
.order_by("score", SortDirection::Asc)
.keyset_after("score", json!(2), 3),
)
.expect("keyset forward");
let forward_scores: Vec<i64> = forward_rows
.iter()
.map(|row| {
row.data
.get("score")
.and_then(|value| value.as_i64())
.expect("score")
})
.collect();
assert_eq!(forward_scores, vec![3, 4, 5]);
let backward_rows = repo
.list(
"scores",
&Query::new()
.order_by("score", SortDirection::Asc)
.keyset_before("score", json!(5), 2),
)
.expect("keyset backward");
let backward_scores: Vec<i64> = backward_rows
.iter()
.map(|row| {
row.data
.get("score")
.and_then(|value| value.as_i64())
.expect("score")
})
.collect();
assert_eq!(backward_scores, vec![3, 4]);
}
#[test]
fn list_window_emits_tokens_and_compact_payload() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
for score in 0..300 {
let mut row = Row::new();
row.insert("score".to_string(), json!(score));
row.insert(
"tenant".to_string(),
json!(if score % 2 == 0 { "a" } else { "b" }),
);
repo.insert("accounts", row).expect("insert account");
}
let query = Query::new()
.order_by("score", SortDirection::Asc)
.window(100, 240, 20)
.wire_format(WireFormatProfile::Compact);
let page = repo.list_window("accounts", &query).expect("list window");
assert_eq!(page.total_rows, 300);
assert_eq!(page.offset, 80);
assert_eq!(page.limit, 180);
assert_eq!(page.rows.len(), 180);
assert_eq!(page.query_fingerprint, query.fingerprint());
assert_eq!(page.token.query_fingerprint, query.fingerprint());
let compact = page.compact_rows.expect("compact payload");
assert!(compact.columns.contains(&"id".to_string()));
assert!(compact.columns.contains(&"score".to_string()));
assert_eq!(compact.rows.len(), page.rows.len());
}
#[test]
fn list_window_rejects_mismatched_query_token() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut row = Row::new();
row.insert("score".to_string(), json!(1));
repo.insert("accounts", row).expect("insert account");
let base = Query::new()
.where_filter(Filter::eq("score", json!(1)))
.paginate(1, 20);
let stale_token = base.next_window_token(0, 20, 42, 1);
let mismatched = Query::new()
.where_filter(Filter::eq("score", json!(2)))
.paginate(1, 20)
.with_window_token(stale_token);
let err = repo
.list_window("accounts", &mismatched)
.expect_err("window token mismatch should fail");
assert!(matches!(err, DataError::Query(_)));
assert!(err.to_string().contains("window token"));
}
#[test]
fn incremental_diff_tracks_insert_update_and_remove() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
for (title, score) in [("A", 10), ("B", 20), ("C", 30)] {
let mut row = Row::new();
row.insert("title".to_string(), json!(title));
row.insert("score".to_string(), json!(score));
repo.insert("accounts", row).expect("insert");
}
let query = Query::new()
.order_by("score", SortDirection::Asc)
.paginate(1, 10);
let previous = repo.list_window("accounts", &query).expect("previous page");
let mut updated_row = Row::new();
updated_row.insert("title".to_string(), json!("B"));
updated_row.insert("score".to_string(), json!(25));
repo.update("accounts", 2, updated_row).expect("update row");
repo.delete("accounts", 3).expect("delete row");
let mut inserted_row = Row::new();
inserted_row.insert("title".to_string(), json!("D"));
inserted_row.insert("score".to_string(), json!(35));
repo.insert("accounts", inserted_row).expect("insert row");
let current = repo.list_window("accounts", &query).expect("current page");
let diff = repo.materialize_incremental_diff(&previous, ¤t);
assert!(!diff.full_resync);
assert_eq!(diff.inserted.len(), 1);
assert_eq!(diff.inserted[0].id, 4);
assert_eq!(diff.updated.len(), 1);
assert_eq!(diff.updated[0].id, 2);
assert_eq!(diff.removed_ids, vec![3]);
}
#[test]
fn unit_of_work_rolls_back_on_error() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut row = Row::new();
row.insert("title".to_string(), json!("before"));
let baseline = repo.insert("posts", row).expect("seed row");
let original_count = repo
.list("posts", &Query::new())
.expect("list before")
.len();
let result: Result<(), DataError> = repo.transaction(|inner| {
let mut patch = Row::new();
patch.insert("title".to_string(), json!("mutated"));
inner.update("posts", baseline.id, patch)?;
Err(DataError::Query("force rollback".to_string()))
});
assert!(result.is_err());
let after = repo
.find("posts", baseline.id)
.expect("find after rollback");
assert_eq!(
after
.as_ref()
.and_then(|row| row.data.get("title"))
.and_then(|value| value.as_str()),
Some("before")
);
assert_eq!(
repo.list("posts", &Query::new()).expect("list after").len(),
original_count
);
}
#[test]
fn optimistic_lock_update_enforces_version_match_and_bumps_version() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
let mut row = Row::new();
row.insert("title".to_string(), json!("v1"));
row.insert("lock_version".to_string(), json!(1u64));
let inserted = repo.insert("posts", row).expect("seed row");
let mut patch = Row::new();
patch.insert("title".to_string(), json!("v2"));
let updated = repo
.update_with_optimistic_lock(
"posts",
inserted.id,
patch,
OptimisticLock::new("lock_version", 1),
)
.expect("optimistic update should pass");
assert_eq!(updated.data.get("title"), Some(&json!("v2")));
assert_eq!(updated.data.get("lock_version"), Some(&json!(2u64)));
let err = repo
.update_with_optimistic_lock(
"posts",
inserted.id,
Row::new(),
OptimisticLock::new("lock_version", 1),
)
.expect_err("stale version should fail");
assert!(err.to_string().contains("optimistic lock conflict"));
}
#[test]
fn query_observability_tracker_surfaces_slow_and_n_plus_one_hints() {
let mut repo = MemoryRepo::new(Box::new(super::SqliteAdapter));
for index in 0..5 {
let mut row = Row::new();
row.insert("title".to_string(), json!(format!("R{index}")));
repo.insert("posts", row).expect("insert");
}
let query = Query::new().paginate(1, 2);
let context = QueryContext::default()
.with_correlation_id("corr-57")
.with_request_id("req-57");
let mut tracker = QueryObservabilityTracker::new(QueryObservabilityPolicy {
slow_query_threshold_ms: 0,
n_plus_one_window_size: 6,
n_plus_one_repeat_threshold: 2,
});
let (_, first) = repo
.list_observed("posts", &query, &context, &mut tracker)
.expect("first observed list");
assert!(first.slow_query);
assert!(!first.potential_n_plus_one);
assert_eq!(first.correlation_id.as_deref(), Some("corr-57"));
assert_eq!(first.request_id.as_deref(), Some("req-57"));
let (_, second) = repo
.list_observed("posts", &query, &context, &mut tracker)
.expect("second observed list");
assert!(second.potential_n_plus_one);
assert!(second
.hints
.iter()
.any(|hint| hint.contains("possible N+1")));
let query_with_preload = query.clone().preload("author");
let (_, third) = repo
.list_observed("posts", &query_with_preload, &context, &mut tracker)
.expect("observed list with preload");
assert!(!third.potential_n_plus_one);
}
}