use crate::{
error::{DataError, DataResult},
query::Query,
repo::{Row, StoredRow},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{BTreeMap, HashMap},
fmt,
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use tracing::{info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IntegrationErrorKind {
Transient,
Permanent,
Auth,
RateLimited,
Timeout,
Unavailable,
InvalidInput,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct IntegrationError {
pub source: String,
pub kind: IntegrationErrorKind,
pub message: String,
pub code: Option<String>,
pub retryable: bool,
}
impl IntegrationError {
pub fn new(
source: impl Into<String>,
kind: IntegrationErrorKind,
message: impl Into<String>,
) -> Self {
let kind_value = kind;
Self {
source: source.into(),
kind: kind_value,
message: message.into(),
code: None,
retryable: matches!(
kind_value,
IntegrationErrorKind::Transient
| IntegrationErrorKind::RateLimited
| IntegrationErrorKind::Timeout
| IntegrationErrorKind::Unavailable
),
}
}
pub fn with_code(mut self, code: impl Into<String>) -> Self {
self.code = Some(code.into());
self
}
}
impl fmt::Display for IntegrationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(code) = &self.code {
write!(f, "[{}:{}] {}", self.source, code, self.message)
} else {
write!(f, "[{}] {}", self.source, self.message)
}
}
}
impl std::error::Error for IntegrationError {}
pub type IntegrationResult<T> = Result<T, IntegrationError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
}
impl RetryPolicy {
pub fn conservative() -> Self {
Self {
max_attempts: 3,
initial_backoff_ms: 50,
max_backoff_ms: 500,
}
}
pub fn never() -> Self {
Self {
max_attempts: 1,
initial_backoff_ms: 0,
max_backoff_ms: 0,
}
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self::conservative()
}
}
pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
where
F: FnMut(u32) -> IntegrationResult<T>,
{
let attempts = policy.max_attempts.max(1);
let mut backoff_ms = policy.initial_backoff_ms;
for attempt in 1..=attempts {
match operation(attempt) {
Ok(value) => return Ok(value),
Err(err) if !err.retryable || attempt == attempts => return Err(err),
Err(_) => {
if backoff_ms > 0 {
thread::sleep(Duration::from_millis(backoff_ms));
}
backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
}
}
}
Err(IntegrationError::new(
"retry",
IntegrationErrorKind::Transient,
"retry exhausted",
))
}
pub trait ConnectionLifecycleHook: Send + Sync {
fn on_connect(&self) -> IntegrationResult<()> {
Ok(())
}
fn on_disconnect(&self) -> IntegrationResult<()> {
Ok(())
}
}
pub trait ConnectionLifecycle {
fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
fn connect(&self) -> IntegrationResult<()>;
fn disconnect(&self) -> IntegrationResult<()>;
}
#[derive(Default)]
pub struct LifecycleHooks {
hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
}
impl LifecycleHooks {
pub fn new() -> Self {
Self::default()
}
}
impl ConnectionLifecycle for LifecycleHooks {
fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
self.hooks.push(hook);
}
fn connect(&self) -> IntegrationResult<()> {
for hook in &self.hooks {
hook.on_connect()?;
}
Ok(())
}
fn disconnect(&self) -> IntegrationResult<()> {
for hook in &self.hooks {
hook.on_disconnect()?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct QueryContext {
pub tenant_id: Option<String>,
pub trace_id: Option<String>,
pub tags: BTreeMap<String, String>,
}
pub trait TypedQueryBoundary {
type Request: Send + Sync;
type Response: Send + Sync;
fn execute(
&self,
request: &Self::Request,
context: &QueryContext,
) -> IntegrationResult<Self::Response>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SqlCommand {
pub statement: String,
pub params: Vec<Value>,
}
impl SqlCommand {
pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
Self {
statement: statement.into(),
params,
}
}
}
pub trait SingleStoreAdapter:
TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
{
fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
let started_at = Instant::now();
let statement = query.statement.as_str();
let param_count = query.params.len();
let result = self.execute(&query, context);
match &result {
Ok(rows) => info!(
target: "shelly.integration.query",
source = "singlestore",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
statement,
param_count,
row_count = rows.len(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "singlestore",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
statement,
param_count,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SearchRequest {
pub index: String,
pub text: String,
pub filters: Vec<crate::query::Filter>,
pub page: usize,
pub per_page: usize,
}
impl SearchRequest {
pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
Self {
index: index.into(),
text: text.into(),
filters: Vec::new(),
page: 1,
per_page: 25,
}
}
pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
self.filters.push(filter);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct SearchResponse {
pub total_hits: usize,
pub rows: Vec<StoredRow>,
}
pub trait OpenSearchAdapter:
TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
{
fn search(
&self,
request: SearchRequest,
context: &QueryContext,
) -> IntegrationResult<SearchResponse> {
let started_at = Instant::now();
let index = request.index.as_str();
let text = request.text.as_str();
let filter_count = request.filters.len();
let page = request.page;
let per_page = request.per_page;
let result = self.execute(&request, context);
match &result {
Ok(response) => info!(
target: "shelly.integration.query",
source = "opensearch",
operation = "search",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
index,
text,
filter_count,
page,
per_page,
row_count = response.rows.len(),
total_hits = response.total_hits,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "opensearch",
operation = "search",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
index,
text,
filter_count,
page,
per_page,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
}
pub trait AnalyticsSink: Send + Sync {
fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AnalyticsEvent {
pub namespace: String,
pub name: String,
pub payload: Value,
}
impl AnalyticsEvent {
pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
Self {
namespace: namespace.into(),
name: name.into(),
payload,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobRequest {
pub workflow: String,
pub payload: Value,
pub idempotency_key: String,
pub metadata: BTreeMap<String, String>,
}
impl JobRequest {
pub fn new(
workflow: impl Into<String>,
payload: Value,
idempotency_key: impl Into<String>,
) -> Self {
Self {
workflow: workflow.into(),
payload,
idempotency_key: idempotency_key.into(),
metadata: BTreeMap::new(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JobState {
Queued,
Running,
Succeeded,
Failed,
Canceled,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobStatus {
pub id: String,
pub state: JobState,
pub attempts: u32,
pub result: Option<Value>,
pub error: Option<IntegrationError>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobHandle {
pub id: String,
pub workflow: String,
pub idempotency_key: String,
}
pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
pub trait JobOrchestrator: Send + Sync {
fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
fn register_completion_callback(&self, callback: JobCompletionCallback);
}
#[derive(Debug, Clone, Default)]
pub struct InMemorySingleStoreAdapter {
rows: Vec<Row>,
}
impl InMemorySingleStoreAdapter {
pub fn new(rows: Vec<Row>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemorySingleStoreAdapter {
type Request = SqlCommand;
type Response = Vec<Row>;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.statement.trim().is_empty() {
return Err(IntegrationError::new(
"singlestore",
IntegrationErrorKind::InvalidInput,
"empty SQL statement",
)
.with_code("empty_statement"));
}
Ok(self.rows.clone())
}
}
impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
#[derive(Debug, Clone, Default)]
pub struct InMemoryOpenSearchAdapter {
rows: Vec<StoredRow>,
}
impl InMemoryOpenSearchAdapter {
pub fn new(rows: Vec<StoredRow>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
type Request = SearchRequest;
type Response = SearchResponse;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.index.trim().is_empty() {
return Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::InvalidInput,
"search index must not be empty",
)
.with_code("empty_index"));
}
let needle = request.text.trim().to_lowercase();
let mut filtered = self
.rows
.iter()
.filter(|row| {
if needle.is_empty() {
return true;
}
row.data.values().any(|value| {
value
.as_str()
.map(|text| text.to_lowercase().contains(&needle))
.unwrap_or(false)
})
})
.cloned()
.collect::<Vec<_>>();
let total_hits = filtered.len();
let page = request.page.max(1);
let per_page = request.per_page.max(1);
let offset = (page - 1) * per_page;
filtered = filtered.into_iter().skip(offset).take(per_page).collect();
Ok(SearchResponse {
total_hits,
rows: filtered,
})
}
}
impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
#[derive(Debug, Default, Clone)]
pub struct InMemoryAxiomSink {
events: Arc<Mutex<Vec<AnalyticsEvent>>>,
}
impl InMemoryAxiomSink {
pub fn events(&self) -> Vec<AnalyticsEvent> {
self.events
.lock()
.map(|events| events.clone())
.unwrap_or_default()
}
}
impl AnalyticsSink for InMemoryAxiomSink {
fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
let started_at = Instant::now();
let namespace = event.namespace.clone();
let name = event.name.clone();
self.events
.lock()
.map_err(|_| {
IntegrationError::new(
"axiom",
IntegrationErrorKind::Unavailable,
"analytics sink lock poisoned",
)
})?
.push(event);
info!(
target: "shelly.integration.query",
source = "axiom",
operation = "send_event",
namespace,
event_name = name,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
Ok(())
}
}
#[derive(Default)]
pub struct InMemoryJobOrchestrator {
statuses: Mutex<HashMap<String, JobStatus>>,
callbacks: Mutex<Vec<JobCompletionCallback>>,
next_id: Mutex<u64>,
}
impl InMemoryJobOrchestrator {
pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
let status = self.with_status_mut(id, |status| {
status.state = JobState::Succeeded;
status.result = Some(result);
status.error = None;
})?;
self.notify(&status);
Ok(())
}
pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
let status = self.with_status_mut(id, |status| {
status.state = JobState::Failed;
status.result = None;
status.error = Some(error);
})?;
self.notify(&status);
Ok(())
}
fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
where
F: FnOnce(&mut JobStatus),
{
let mut statuses = self.statuses.lock().map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?;
let Some(status) = statuses.get_mut(id) else {
return Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
.with_code("job_not_found"));
};
status.attempts = status.attempts.saturating_add(1);
update(status);
Ok(status.clone())
}
fn notify(&self, status: &JobStatus) {
if let Ok(callbacks) = self.callbacks.lock() {
for callback in callbacks.iter() {
callback(status);
}
}
}
fn is_terminal(state: JobState) -> bool {
matches!(
state,
JobState::Succeeded | JobState::Failed | JobState::Canceled
)
}
}
impl JobOrchestrator for InMemoryJobOrchestrator {
fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
let started_at = Instant::now();
let workflow = request.workflow.clone();
let idempotency_key = request.idempotency_key.clone();
let mut next_id = self.next_id.lock().map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job id lock poisoned",
)
})?;
*next_id = next_id.saturating_add(1);
let id = format!("job-{next_id}");
let status = JobStatus {
id: id.clone(),
state: JobState::Queued,
attempts: 0,
result: None,
error: None,
};
self.statuses
.lock()
.map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?
.insert(id.clone(), status);
let handle = JobHandle {
id,
workflow: request.workflow,
idempotency_key: request.idempotency_key,
};
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "enqueue",
workflow,
idempotency_key,
job_id = handle.id.as_str(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
Ok(handle)
}
fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
let started_at = Instant::now();
let result = self
.statuses
.lock()
.map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?
.get(id)
.cloned()
.ok_or_else(|| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
.with_code("job_not_found")
});
match &result {
Ok(status) => info!(
target: "shelly.integration.query",
source = "trigger",
operation = "status",
job_id = id,
job_state = ?status.state,
attempts = status.attempts,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "status",
job_id = id,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration call failed"
),
}
result
}
fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
let started_at = Instant::now();
let attempts = attempts.max(1);
let mut polled = 0u32;
for current in 1..=attempts {
let status = self.status(id)?;
polled = current;
if Self::is_terminal(status.state) || current == attempts {
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
terminal = Self::is_terminal(status.state),
job_state = ?status.state,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
return Ok(status);
}
if backoff_ms > 0 {
thread::sleep(Duration::from_millis(backoff_ms));
}
}
let result = self.status(id);
match &result {
Ok(status) => info!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
terminal = Self::is_terminal(status.state),
job_state = ?status.state,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration call failed"
),
}
result
}
fn register_completion_callback(&self, callback: JobCompletionCallback) {
if let Ok(mut callbacks) = self.callbacks.lock() {
callbacks.push(callback);
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "register_completion_callback",
callback_count = callbacks.len(),
"Shelly integration callback registered"
);
} else {
warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "register_completion_callback",
"Shelly integration callback registration failed"
);
}
}
}
pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
DataError::Integration(format!("[{}] {}", source.into(), err))
}
pub fn map_integration_result<T>(
source: impl Into<String>,
result: IntegrationResult<T>,
) -> DataResult<T> {
result.map_err(|err| map_integration_error(source, err))
}
pub fn query_from_search(request: &SearchRequest) -> Query {
let mut query = Query::new().paginate(request.page, request.per_page);
for filter in &request.filters {
query = query.where_filter(filter.clone());
}
query
}
#[cfg(test)]
mod tests {
use super::{
map_integration_result, run_with_retry, AnalyticsEvent, AnalyticsSink, ConnectionLifecycle,
ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
SingleStoreAdapter, SqlCommand,
};
use serde_json::{json, Value};
use std::sync::{Arc, Mutex};
struct CountingHook {
connects: Arc<Mutex<u32>>,
disconnects: Arc<Mutex<u32>>,
}
impl ConnectionLifecycleHook for CountingHook {
fn on_connect(&self) -> super::IntegrationResult<()> {
let mut guard = self.connects.lock().unwrap();
*guard += 1;
Ok(())
}
fn on_disconnect(&self) -> super::IntegrationResult<()> {
let mut guard = self.disconnects.lock().unwrap();
*guard += 1;
Ok(())
}
}
#[test]
fn lifecycle_hooks_are_called() {
let connects = Arc::new(Mutex::new(0));
let disconnects = Arc::new(Mutex::new(0));
let mut lifecycle = LifecycleHooks::new();
lifecycle.register_hook(Arc::new(CountingHook {
connects: connects.clone(),
disconnects: disconnects.clone(),
}));
lifecycle.connect().unwrap();
lifecycle.disconnect().unwrap();
assert_eq!(*connects.lock().unwrap(), 1);
assert_eq!(*disconnects.lock().unwrap(), 1);
}
#[test]
fn retry_policy_retries_transient_errors() {
let mut calls = 0u32;
let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
calls = attempt;
if attempt < 3 {
Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::Transient,
"temporary failure",
))
} else {
Ok("ok")
}
})
.unwrap();
assert_eq!(result, "ok");
assert_eq!(calls, 3);
}
#[test]
fn retry_policy_stops_on_permanent_errors() {
let mut calls = 0u32;
let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
calls = attempt;
Err::<(), IntegrationError>(IntegrationError::new(
"singlestore",
IntegrationErrorKind::Permanent,
"invalid sql",
))
})
.unwrap_err();
assert_eq!(calls, 1);
assert_eq!(err.kind, IntegrationErrorKind::Permanent);
}
#[test]
fn integration_result_maps_into_data_error() {
let mapped = map_integration_result::<()>(
"trigger",
Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"service unavailable",
)),
)
.unwrap_err();
assert!(mapped.to_string().contains("service unavailable"));
}
#[derive(Default)]
struct InMemoryJobs {
statuses: Arc<Mutex<Vec<JobStatus>>>,
callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
}
impl JobOrchestrator for InMemoryJobs {
fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
let id = format!("job-{}", request.idempotency_key);
self.statuses.lock().unwrap().push(JobStatus {
id: id.clone(),
state: JobState::Queued,
attempts: 0,
result: None,
error: None,
});
Ok(JobHandle {
id,
workflow: request.workflow,
idempotency_key: request.idempotency_key,
})
}
fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
self.statuses
.lock()
.unwrap()
.iter()
.find(|status| status.id == id)
.cloned()
.ok_or_else(|| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
})
}
fn poll(
&self,
id: &str,
_attempts: u32,
_backoff_ms: u64,
) -> super::IntegrationResult<JobStatus> {
self.status(id)
}
fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
self.callbacks.lock().unwrap().push(callback);
}
}
#[test]
fn job_orchestration_contract_supports_enqueue_and_status() {
let jobs = InMemoryJobs::default();
let handle = jobs
.enqueue(JobRequest::new(
"sync_customer",
json!({"id": 42}),
"idempotent-42",
))
.unwrap();
let status = jobs.status(&handle.id).unwrap();
assert_eq!(status.state, JobState::Queued);
assert_eq!(handle.idempotency_key, "idempotent-42");
let ctx = QueryContext {
tenant_id: Some("tenant-a".to_string()),
..QueryContext::default()
};
assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
}
#[test]
fn reference_singlestore_adapter_runs_typed_sql_boundary() {
let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
"region".to_string(),
Value::String("EMEA".to_string()),
)])]);
let rows = adapter
.run_query(
SqlCommand::new("SELECT region FROM accounts", Vec::new()),
&QueryContext::default(),
)
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0].get("region"),
Some(&Value::String("EMEA".to_string()))
);
}
#[test]
fn reference_opensearch_adapter_filters_rows() {
let rows = vec![
crate::StoredRow {
id: 1,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Acme renewal".to_string()),
)]),
},
crate::StoredRow {
id: 2,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Globex onboarding".to_string()),
)]),
},
];
let adapter = InMemoryOpenSearchAdapter::new(rows);
let response = adapter
.search(
SearchRequest::new("accounts", "renewal"),
&QueryContext::default(),
)
.unwrap();
assert_eq!(response.total_hits, 1);
assert_eq!(response.rows[0].id, 1);
}
#[test]
fn reference_axiom_sink_records_events() {
let sink = InMemoryAxiomSink::default();
sink.send_event(AnalyticsEvent::new(
"sales",
"query_executed",
json!({"latency_ms": 12}),
))
.unwrap();
let events = sink.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].name, "query_executed");
}
#[test]
fn reference_trigger_orchestrator_supports_completion_and_polling() {
let orchestrator = InMemoryJobOrchestrator::default();
let completed = Arc::new(Mutex::new(false));
let completed_flag = completed.clone();
orchestrator.register_completion_callback(Arc::new(move |status| {
if status.state == JobState::Succeeded {
if let Ok(mut guard) = completed_flag.lock() {
*guard = true;
}
}
}));
let handle = orchestrator
.enqueue(JobRequest::new(
"refresh_dashboard",
json!({"account_id": 7}),
"refresh-7",
))
.unwrap();
orchestrator
.mark_succeeded(&handle.id, json!({"rows_synced": 18}))
.unwrap();
let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
assert_eq!(status.state, JobState::Succeeded);
assert_eq!(
status.result,
Some(json!({
"rows_synced": 18
}))
);
assert!(*completed.lock().unwrap());
}
}