use crate::{
error::{DataError, DataResult},
query::{Query, WireFormatProfile},
repo::{CompactRowsPayload, Row, StoredRow},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{BTreeMap, HashMap},
fmt,
future::Future,
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use tokio::time::sleep;
use tracing::{info, warn};
pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IntegrationErrorKind {
Transient,
Permanent,
Auth,
RateLimited,
Timeout,
Unavailable,
InvalidInput,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct IntegrationError {
pub source: String,
pub kind: IntegrationErrorKind,
pub message: String,
pub code: Option<String>,
pub retryable: bool,
}
impl IntegrationError {
pub fn new(
source: impl Into<String>,
kind: IntegrationErrorKind,
message: impl Into<String>,
) -> Self {
let kind_value = kind;
Self {
source: source.into(),
kind: kind_value,
message: message.into(),
code: None,
retryable: matches!(
kind_value,
IntegrationErrorKind::Transient
| IntegrationErrorKind::RateLimited
| IntegrationErrorKind::Timeout
| IntegrationErrorKind::Unavailable
),
}
}
pub fn with_code(mut self, code: impl Into<String>) -> Self {
self.code = Some(code.into());
self
}
}
impl fmt::Display for IntegrationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(code) = &self.code {
write!(f, "[{}:{}] {}", self.source, code, self.message)
} else {
write!(f, "[{}] {}", self.source, self.message)
}
}
}
impl std::error::Error for IntegrationError {}
pub type IntegrationResult<T> = Result<T, IntegrationError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
}
impl RetryPolicy {
pub fn conservative() -> Self {
Self {
max_attempts: 3,
initial_backoff_ms: 50,
max_backoff_ms: 500,
}
}
pub fn never() -> Self {
Self {
max_attempts: 1,
initial_backoff_ms: 0,
max_backoff_ms: 0,
}
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self::conservative()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct AdapterCallContract {
pub retry_policy: RetryPolicy,
pub timeout_ms: u64,
}
impl AdapterCallContract {
pub fn low_latency() -> Self {
Self {
retry_policy: RetryPolicy::never(),
timeout_ms: 250,
}
}
pub fn default_query() -> Self {
Self {
retry_policy: RetryPolicy::conservative(),
timeout_ms: 1_500,
}
}
pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
self.retry_policy = retry_policy;
self
}
pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms.max(1);
self
}
}
impl Default for AdapterCallContract {
fn default() -> Self {
Self::default_query()
}
}
pub fn run_with_contract<T, F>(
source: &str,
operation: &str,
contract: AdapterCallContract,
context: &QueryContext,
mut operation_fn: F,
) -> IntegrationResult<T>
where
F: FnMut(u32) -> IntegrationResult<T>,
{
let policy = context
.retry_policy_override()
.unwrap_or(contract.retry_policy);
let attempts = policy.max_attempts.max(1);
let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
let mut backoff_ms = policy.initial_backoff_ms;
for attempt in 1..=attempts {
let started_at = Instant::now();
let result = operation_fn(attempt);
let elapsed_ms = started_at.elapsed().as_millis() as u64;
if elapsed_ms > timeout_ms {
return Err(IntegrationError::new(
source,
IntegrationErrorKind::Timeout,
format!(
"{operation} exceeded timeout: elapsed={}ms timeout={}ms",
elapsed_ms, timeout_ms
),
)
.with_code("operation_timeout"));
}
match result {
Ok(value) => return Ok(value),
Err(err) if !err.retryable || attempt == attempts => return Err(err),
Err(err) => {
warn!(
target: "shelly.integration.query",
source,
operation,
attempt,
max_attempts = attempts,
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
correlation_id = context.correlation_id().unwrap_or("-"),
request_id = context.request_id().unwrap_or("-"),
timeout_ms,
backoff_ms,
error = %err,
"Shelly integration retrying transient failure without blocking sleep; use run_with_contract_async for non-blocking backoff"
);
backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
}
}
}
Err(IntegrationError::new(
source,
IntegrationErrorKind::Transient,
format!("{operation} retry exhausted"),
)
.with_code("retry_exhausted"))
}
pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
where
F: FnMut(u32) -> IntegrationResult<T>,
{
let attempts = policy.max_attempts.max(1);
let mut backoff_ms = policy.initial_backoff_ms;
for attempt in 1..=attempts {
match operation(attempt) {
Ok(value) => return Ok(value),
Err(err) if !err.retryable || attempt == attempts => return Err(err),
Err(_) => {
backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
}
}
}
Err(IntegrationError::new(
"retry",
IntegrationErrorKind::Transient,
"retry exhausted",
))
}
pub async fn run_with_contract_async<T, F, Fut>(
source: &str,
operation: &str,
contract: AdapterCallContract,
context: &QueryContext,
mut operation_fn: F,
) -> IntegrationResult<T>
where
F: FnMut(u32) -> Fut,
Fut: Future<Output = IntegrationResult<T>>,
{
let policy = context
.retry_policy_override()
.unwrap_or(contract.retry_policy);
let attempts = policy.max_attempts.max(1);
let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
let mut backoff_ms = policy.initial_backoff_ms;
for attempt in 1..=attempts {
let started_at = Instant::now();
let result = operation_fn(attempt).await;
let elapsed_ms = started_at.elapsed().as_millis() as u64;
if elapsed_ms > timeout_ms {
return Err(IntegrationError::new(
source,
IntegrationErrorKind::Timeout,
format!(
"{operation} exceeded timeout: elapsed={}ms timeout={}ms",
elapsed_ms, timeout_ms
),
)
.with_code("operation_timeout"));
}
match result {
Ok(value) => return Ok(value),
Err(err) if !err.retryable || attempt == attempts => return Err(err),
Err(err) => {
warn!(
target: "shelly.integration.query",
source,
operation,
attempt,
max_attempts = attempts,
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
correlation_id = context.correlation_id().unwrap_or("-"),
request_id = context.request_id().unwrap_or("-"),
timeout_ms,
backoff_ms,
error = %err,
"Shelly integration retrying transient failure with non-blocking backoff"
);
if backoff_ms > 0 {
sleep(Duration::from_millis(backoff_ms)).await;
}
backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
}
}
}
Err(IntegrationError::new(
source,
IntegrationErrorKind::Transient,
format!("{operation} retry exhausted"),
)
.with_code("retry_exhausted"))
}
pub async fn run_with_retry_async<T, F, Fut>(
policy: RetryPolicy,
mut operation: F,
) -> IntegrationResult<T>
where
F: FnMut(u32) -> Fut,
Fut: Future<Output = IntegrationResult<T>>,
{
let attempts = policy.max_attempts.max(1);
let mut backoff_ms = policy.initial_backoff_ms;
for attempt in 1..=attempts {
match operation(attempt).await {
Ok(value) => return Ok(value),
Err(err) if !err.retryable || attempt == attempts => return Err(err),
Err(_) => {
if backoff_ms > 0 {
sleep(Duration::from_millis(backoff_ms)).await;
}
backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
}
}
}
Err(IntegrationError::new(
"retry",
IntegrationErrorKind::Transient,
"retry exhausted",
))
}
pub trait ConnectionLifecycleHook: Send + Sync {
fn on_connect(&self) -> IntegrationResult<()> {
Ok(())
}
fn on_disconnect(&self) -> IntegrationResult<()> {
Ok(())
}
}
pub trait ConnectionLifecycle {
fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
fn connect(&self) -> IntegrationResult<()>;
fn disconnect(&self) -> IntegrationResult<()>;
}
#[derive(Default)]
pub struct LifecycleHooks {
hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
}
impl LifecycleHooks {
pub fn new() -> Self {
Self::default()
}
}
impl ConnectionLifecycle for LifecycleHooks {
fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
self.hooks.push(hook);
}
fn connect(&self) -> IntegrationResult<()> {
for hook in &self.hooks {
hook.on_connect()?;
}
Ok(())
}
fn disconnect(&self) -> IntegrationResult<()> {
for hook in &self.hooks {
hook.on_disconnect()?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct QueryContext {
pub tenant_id: Option<String>,
pub trace_id: Option<String>,
pub tags: BTreeMap<String, String>,
}
impl QueryContext {
pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
let tenant_id = tenant_id.into();
let tenant_id = tenant_id.trim();
if tenant_id.is_empty() {
self.tenant_id = None;
} else {
self.tenant_id = Some(tenant_id.to_string());
}
self
}
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
}
pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
}
pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
}
pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
self.with_tag(
CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
policy.max_attempts.to_string(),
)
.with_tag(
CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
policy.initial_backoff_ms.to_string(),
)
.with_tag(
CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
policy.max_backoff_ms.to_string(),
)
}
pub fn correlation_id(&self) -> Option<&str> {
self.tags
.get(CONTEXT_TAG_CORRELATION_ID)
.map(String::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
}
pub fn request_id(&self) -> Option<&str> {
self.tags
.get(CONTEXT_TAG_REQUEST_ID)
.map(String::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
}
pub fn timeout_ms(&self) -> Option<u64> {
self.tags
.get(CONTEXT_TAG_TIMEOUT_MS)
.and_then(|value| value.parse::<u64>().ok())
.filter(|value| *value > 0)
}
pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
let max_attempts = self
.tags
.get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
.and_then(|value| value.parse::<u32>().ok())?;
let initial_backoff_ms = self
.tags
.get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0);
let max_backoff_ms = self
.tags
.get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(initial_backoff_ms);
Some(RetryPolicy {
max_attempts: max_attempts.max(1),
initial_backoff_ms,
max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
})
}
}
pub trait TypedQueryBoundary {
type Request: Send + Sync;
type Response: Send + Sync;
fn execute(
&self,
request: &Self::Request,
context: &QueryContext,
) -> IntegrationResult<Self::Response>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SqlCommand {
pub statement: String,
pub params: Vec<Value>,
}
impl SqlCommand {
pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
Self {
statement: statement.into(),
params,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DataWindowRequest {
pub dataset: String,
pub offset: usize,
pub limit: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub query_token: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub window_token: Option<String>,
#[serde(default)]
pub wire_format: WireFormatProfile,
}
impl DataWindowRequest {
pub fn new(dataset: impl Into<String>, offset: usize, limit: usize) -> Self {
Self {
dataset: dataset.into(),
offset,
limit: limit.max(1),
query_token: None,
window_token: None,
wire_format: WireFormatProfile::Json,
}
}
pub fn with_query_token(mut self, query_token: impl Into<String>) -> Self {
self.query_token = Some(query_token.into());
self
}
pub fn with_window_token(mut self, window_token: impl Into<String>) -> Self {
self.window_token = Some(window_token.into());
self
}
pub fn compact(mut self) -> Self {
self.wire_format = WireFormatProfile::Compact;
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DataWindowResponse {
pub dataset: String,
pub offset: usize,
pub limit: usize,
pub total_rows: usize,
pub query_token: String,
pub window_token: String,
pub rows: Vec<Row>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub compact_rows: Option<CompactRowsPayload>,
}
pub trait SingleStoreAdapter:
TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
{
fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
let started_at = Instant::now();
let statement = query.statement.as_str();
let param_count = query.params.len();
let result = self.execute(&query, context);
match &result {
Ok(rows) => info!(
target: "shelly.integration.query",
source = "singlestore",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
statement,
param_count,
row_count = rows.len(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "singlestore",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
statement,
param_count,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
fn run_high_volume_window_query(
&self,
query: SqlCommand,
request: DataWindowRequest,
context: &QueryContext,
) -> IntegrationResult<DataWindowResponse> {
let rows = self.run_query(query, context)?;
Ok(materialize_window_response(
"singlestore",
request,
rows,
None,
))
}
}
pub trait ClickHouseAdapter:
TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
{
fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
let started_at = Instant::now();
let statement = query.statement.as_str();
let param_count = query.params.len();
let result = self.execute(&query, context);
match &result {
Ok(rows) => info!(
target: "shelly.integration.query",
source = "clickhouse",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
correlation_id = context.correlation_id().unwrap_or("-"),
request_id = context.request_id().unwrap_or("-"),
tag_count = context.tags.len(),
statement,
param_count,
row_count = rows.len(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "clickhouse",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
correlation_id = context.correlation_id().unwrap_or("-"),
request_id = context.request_id().unwrap_or("-"),
tag_count = context.tags.len(),
statement,
param_count,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
fn run_high_volume_window_query(
&self,
query: SqlCommand,
request: DataWindowRequest,
context: &QueryContext,
) -> IntegrationResult<DataWindowResponse> {
let rows = self.run_query(query, context)?;
Ok(materialize_window_response(
"clickhouse",
request,
rows,
None,
))
}
}
pub trait BigQueryAdapter:
TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
{
fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
let started_at = Instant::now();
let statement = query.statement.as_str();
let param_count = query.params.len();
let result = self.execute(&query, context);
match &result {
Ok(rows) => info!(
target: "shelly.integration.query",
source = "bigquery",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
correlation_id = context.correlation_id().unwrap_or("-"),
request_id = context.request_id().unwrap_or("-"),
tag_count = context.tags.len(),
statement,
param_count,
row_count = rows.len(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "bigquery",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
correlation_id = context.correlation_id().unwrap_or("-"),
request_id = context.request_id().unwrap_or("-"),
tag_count = context.tags.len(),
statement,
param_count,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
fn run_high_volume_window_query(
&self,
query: SqlCommand,
request: DataWindowRequest,
context: &QueryContext,
) -> IntegrationResult<DataWindowResponse> {
let rows = self.run_query(query, context)?;
Ok(materialize_window_response("bigquery", request, rows, None))
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SearchRequest {
pub index: String,
pub text: String,
pub filters: Vec<crate::query::Filter>,
pub page: usize,
pub per_page: usize,
}
impl SearchRequest {
pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
Self {
index: index.into(),
text: text.into(),
filters: Vec::new(),
page: 1,
per_page: 25,
}
}
pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
self.filters.push(filter);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct SearchResponse {
pub total_hits: usize,
pub rows: Vec<StoredRow>,
}
pub trait OpenSearchAdapter:
TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse> + Send + Sync
{
fn search(
&self,
request: SearchRequest,
context: &QueryContext,
) -> IntegrationResult<SearchResponse> {
let started_at = Instant::now();
let index = request.index.as_str();
let text = request.text.as_str();
let filter_count = request.filters.len();
let page = request.page;
let per_page = request.per_page;
let result = self.execute(&request, context);
match &result {
Ok(response) => info!(
target: "shelly.integration.query",
source = "opensearch",
operation = "search",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
index,
text,
filter_count,
page,
per_page,
row_count = response.rows.len(),
total_hits = response.total_hits,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "opensearch",
operation = "search",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
index,
text,
filter_count,
page,
per_page,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
fn search_window(
&self,
mut request: SearchRequest,
window: DataWindowRequest,
context: &QueryContext,
) -> IntegrationResult<DataWindowResponse> {
let limit = window.limit.max(1);
request.page = (window.offset / limit).saturating_add(1);
request.per_page = limit;
let response = self.search(request, context)?;
let rows = response
.rows
.into_iter()
.map(|row| row.data)
.collect::<Vec<_>>();
let query_token = window
.query_token
.unwrap_or_else(|| format!("opensearch:{}:{}", window.dataset, response.total_hits));
let window_token = window.window_token.unwrap_or_else(|| {
format!(
"{query_token}:offset={}:limit={limit}:rows={}",
window.offset,
rows.len()
)
});
let compact_rows =
(window.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
Ok(DataWindowResponse {
dataset: window.dataset,
offset: window.offset,
limit,
total_rows: response.total_hits,
query_token,
window_token,
rows,
compact_rows,
})
}
}
pub trait AnalyticsSink: Send + Sync {
fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AnalyticsEvent {
pub namespace: String,
pub name: String,
pub payload: Value,
}
impl AnalyticsEvent {
pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
Self {
namespace: namespace.into(),
name: name.into(),
payload,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobRequest {
pub workflow: String,
pub payload: Value,
pub idempotency_key: String,
pub metadata: BTreeMap<String, String>,
}
impl JobRequest {
pub fn new(
workflow: impl Into<String>,
payload: Value,
idempotency_key: impl Into<String>,
) -> Self {
Self {
workflow: workflow.into(),
payload,
idempotency_key: idempotency_key.into(),
metadata: BTreeMap::new(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JobState {
Queued,
Running,
Succeeded,
Failed,
Canceled,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobStatus {
pub id: String,
pub state: JobState,
pub attempts: u32,
pub result: Option<Value>,
pub error: Option<IntegrationError>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobHandle {
pub id: String,
pub workflow: String,
pub idempotency_key: String,
}
pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
pub trait JobOrchestrator: Send + Sync {
fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
fn register_completion_callback(&self, callback: JobCompletionCallback);
}
pub trait TriggerDevAdapter: Send + Sync {
fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
fn poll_workflow(
&self,
id: &str,
attempts: u32,
backoff_ms: u64,
) -> IntegrationResult<JobStatus>;
}
impl<T> TriggerDevAdapter for T
where
T: JobOrchestrator + Send + Sync,
{
fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
self.enqueue(request)
}
fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
self.status(id)
}
fn poll_workflow(
&self,
id: &str,
attempts: u32,
backoff_ms: u64,
) -> IntegrationResult<JobStatus> {
self.poll(id, attempts, backoff_ms)
}
}
#[derive(Clone)]
pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
sink: Arc<S>,
namespace: String,
}
impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
Self {
sink,
namespace: namespace.into(),
}
}
pub fn emit(
&self,
name: impl Into<String>,
payload: Value,
context: &QueryContext,
) -> IntegrationResult<()> {
let mut envelope = serde_json::Map::new();
envelope.insert("payload".to_string(), payload);
envelope.insert(
"tenant_id".to_string(),
context
.tenant_id
.as_ref()
.map(|value| Value::String(value.clone()))
.unwrap_or(Value::Null),
);
envelope.insert(
"trace_id".to_string(),
context
.trace_id
.as_ref()
.map(|value| Value::String(value.clone()))
.unwrap_or(Value::Null),
);
envelope.insert(
"correlation_id".to_string(),
context
.correlation_id()
.map(|value| Value::String(value.to_string()))
.unwrap_or(Value::Null),
);
envelope.insert(
"request_id".to_string(),
context
.request_id()
.map(|value| Value::String(value.to_string()))
.unwrap_or(Value::Null),
);
envelope.insert("tags".to_string(), serde_json::json!(context.tags));
self.sink.send_event(AnalyticsEvent::new(
self.namespace.clone(),
name.into(),
Value::Object(envelope),
))
}
}
#[derive(Debug, Clone, Default)]
pub struct InMemorySingleStoreAdapter {
rows: Vec<Row>,
}
impl InMemorySingleStoreAdapter {
pub fn new(rows: Vec<Row>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemorySingleStoreAdapter {
type Request = SqlCommand;
type Response = Vec<Row>;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.statement.trim().is_empty() {
return Err(IntegrationError::new(
"singlestore",
IntegrationErrorKind::InvalidInput,
"empty SQL statement",
)
.with_code("empty_statement"));
}
Ok(self.rows.clone())
}
}
impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
#[derive(Debug, Clone, Default)]
pub struct InMemoryClickHouseAdapter {
rows: Vec<Row>,
}
impl InMemoryClickHouseAdapter {
pub fn new(rows: Vec<Row>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemoryClickHouseAdapter {
type Request = SqlCommand;
type Response = Vec<Row>;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.statement.trim().is_empty() {
return Err(IntegrationError::new(
"clickhouse",
IntegrationErrorKind::InvalidInput,
"empty SQL statement",
)
.with_code("empty_statement"));
}
Ok(self.rows.clone())
}
}
impl ClickHouseAdapter for InMemoryClickHouseAdapter {}
#[derive(Debug, Clone, Default)]
pub struct InMemoryBigQueryAdapter {
rows: Vec<Row>,
}
impl InMemoryBigQueryAdapter {
pub fn new(rows: Vec<Row>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemoryBigQueryAdapter {
type Request = SqlCommand;
type Response = Vec<Row>;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.statement.trim().is_empty() {
return Err(IntegrationError::new(
"bigquery",
IntegrationErrorKind::InvalidInput,
"empty SQL statement",
)
.with_code("empty_statement"));
}
Ok(self.rows.clone())
}
}
impl BigQueryAdapter for InMemoryBigQueryAdapter {}
#[derive(Debug, Clone, Default)]
pub struct InMemoryOpenSearchAdapter {
rows: Vec<StoredRow>,
}
impl InMemoryOpenSearchAdapter {
pub fn new(rows: Vec<StoredRow>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
type Request = SearchRequest;
type Response = SearchResponse;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.index.trim().is_empty() {
return Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::InvalidInput,
"search index must not be empty",
)
.with_code("empty_index"));
}
let needle = request.text.trim().to_lowercase();
let mut filtered = self
.rows
.iter()
.filter(|row| {
if needle.is_empty() {
return true;
}
row.data.values().any(|value| {
value
.as_str()
.map(|text| text.to_lowercase().contains(&needle))
.unwrap_or(false)
})
})
.cloned()
.collect::<Vec<_>>();
let total_hits = filtered.len();
let page = request.page.max(1);
let per_page = request.per_page.max(1);
let offset = (page - 1) * per_page;
filtered = filtered.into_iter().skip(offset).take(per_page).collect();
Ok(SearchResponse {
total_hits,
rows: filtered,
})
}
}
impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
#[derive(Debug, Default, Clone)]
pub struct InMemoryAxiomSink {
events: Arc<Mutex<Vec<AnalyticsEvent>>>,
}
impl InMemoryAxiomSink {
pub fn events(&self) -> Vec<AnalyticsEvent> {
self.events
.lock()
.map(|events| events.clone())
.unwrap_or_default()
}
}
impl AnalyticsSink for InMemoryAxiomSink {
fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
let started_at = Instant::now();
let namespace = event.namespace.clone();
let name = event.name.clone();
self.events
.lock()
.map_err(|_| {
IntegrationError::new(
"axiom",
IntegrationErrorKind::Unavailable,
"analytics sink lock poisoned",
)
})?
.push(event);
info!(
target: "shelly.integration.query",
source = "axiom",
operation = "send_event",
namespace,
event_name = name,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
Ok(())
}
}
#[derive(Default)]
pub struct InMemoryJobOrchestrator {
statuses: Mutex<HashMap<String, JobStatus>>,
callbacks: Mutex<Vec<JobCompletionCallback>>,
next_id: Mutex<u64>,
}
impl InMemoryJobOrchestrator {
pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
let status = self.with_status_mut(id, |status| {
status.state = JobState::Succeeded;
status.result = Some(result);
status.error = None;
})?;
self.notify(&status);
Ok(())
}
pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
let status = self.with_status_mut(id, |status| {
status.state = JobState::Failed;
status.result = None;
status.error = Some(error);
})?;
self.notify(&status);
Ok(())
}
fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
where
F: FnOnce(&mut JobStatus),
{
let mut statuses = self.statuses.lock().map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?;
let Some(status) = statuses.get_mut(id) else {
return Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
.with_code("job_not_found"));
};
status.attempts = status.attempts.saturating_add(1);
update(status);
Ok(status.clone())
}
fn notify(&self, status: &JobStatus) {
if let Ok(callbacks) = self.callbacks.lock() {
for callback in callbacks.iter() {
callback(status);
}
}
}
fn is_terminal(state: JobState) -> bool {
matches!(
state,
JobState::Succeeded | JobState::Failed | JobState::Canceled
)
}
}
impl JobOrchestrator for InMemoryJobOrchestrator {
fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
let started_at = Instant::now();
let workflow = request.workflow.clone();
let idempotency_key = request.idempotency_key.clone();
let mut next_id = self.next_id.lock().map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job id lock poisoned",
)
})?;
*next_id = next_id.saturating_add(1);
let id = format!("job-{next_id}");
let status = JobStatus {
id: id.clone(),
state: JobState::Queued,
attempts: 0,
result: None,
error: None,
};
self.statuses
.lock()
.map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?
.insert(id.clone(), status);
let handle = JobHandle {
id,
workflow: request.workflow,
idempotency_key: request.idempotency_key,
};
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "enqueue",
workflow,
idempotency_key,
job_id = handle.id.as_str(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
Ok(handle)
}
fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
let started_at = Instant::now();
let result = self
.statuses
.lock()
.map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?
.get(id)
.cloned()
.ok_or_else(|| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
.with_code("job_not_found")
});
match &result {
Ok(status) => info!(
target: "shelly.integration.query",
source = "trigger",
operation = "status",
job_id = id,
job_state = ?status.state,
attempts = status.attempts,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "status",
job_id = id,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration call failed"
),
}
result
}
fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
let started_at = Instant::now();
let attempts = attempts.max(1);
let mut polled = 0u32;
for current in 1..=attempts {
let status = self.status(id)?;
polled = current;
if Self::is_terminal(status.state) || current == attempts {
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
terminal = Self::is_terminal(status.state),
job_state = ?status.state,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
return Ok(status);
}
if backoff_ms > 0 {
thread::sleep(Duration::from_millis(backoff_ms));
}
}
let result = self.status(id);
match &result {
Ok(status) => info!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
terminal = Self::is_terminal(status.state),
job_state = ?status.state,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration call failed"
),
}
result
}
fn register_completion_callback(&self, callback: JobCompletionCallback) {
if let Ok(mut callbacks) = self.callbacks.lock() {
callbacks.push(callback);
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "register_completion_callback",
callback_count = callbacks.len(),
"Shelly integration callback registered"
);
} else {
warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "register_completion_callback",
"Shelly integration callback registration failed"
);
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AdapterConformanceCheck {
pub name: String,
pub passed: bool,
pub detail: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct AdapterConformanceReport {
pub checks: Vec<AdapterConformanceCheck>,
}
impl AdapterConformanceReport {
pub fn passed(&self) -> bool {
self.checks.iter().all(|check| check.passed)
}
pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
self.checks.push(AdapterConformanceCheck {
name: name.into(),
passed: true,
detail: detail.into(),
});
}
pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
self.checks.push(AdapterConformanceCheck {
name: name.into(),
passed: false,
detail: detail.into(),
});
}
}
pub fn run_adapter_conformance_suite(
singlestore: &dyn SingleStoreAdapter,
clickhouse: &dyn ClickHouseAdapter,
bigquery: &dyn BigQueryAdapter,
opensearch: &dyn OpenSearchAdapter,
trigger: &dyn TriggerDevAdapter,
analytics: &dyn AnalyticsSink,
context: &QueryContext,
) -> AdapterConformanceReport {
let mut report = AdapterConformanceReport::default();
match singlestore.run_query(
SqlCommand::new(
"SELECT tenant, active_accounts FROM tenant_rollup",
Vec::new(),
),
context,
) {
Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
Err(err) => report.add_fail("singlestore.query", err.to_string()),
}
match clickhouse.run_query(
SqlCommand::new("SELECT account, events FROM account_events", Vec::new()),
context,
) {
Ok(rows) => report.add_pass("clickhouse.query", format!("rows={}", rows.len())),
Err(err) => report.add_fail("clickhouse.query", err.to_string()),
}
match bigquery.run_query(
SqlCommand::new("SELECT account, arr FROM analytics_rollup", Vec::new()),
context,
) {
Ok(rows) => report.add_pass("bigquery.query", format!("rows={}", rows.len())),
Err(err) => report.add_fail("bigquery.query", err.to_string()),
}
match opensearch.search(SearchRequest::new("accounts", ""), context) {
Ok(response) => report.add_pass(
"opensearch.search",
format!(
"rows={} total_hits={}",
response.rows.len(),
response.total_hits
),
),
Err(err) => report.add_fail("opensearch.search", err.to_string()),
}
match trigger.trigger_workflow(JobRequest::new(
"adapter_conformance_probe",
serde_json::json!({"probe": true}),
"adapter-conformance-probe",
)) {
Ok(handle) => {
report.add_pass("trigger.enqueue", format!("id={}", handle.id));
match trigger.workflow_status(&handle.id) {
Ok(status) => report.add_pass(
"trigger.status",
format!("state={:?} attempts={}", status.state, status.attempts),
),
Err(err) => report.add_fail("trigger.status", err.to_string()),
}
}
Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
}
let analytics_event = AnalyticsEvent::new(
"adapter_conformance",
"probe",
serde_json::json!({
"trace_id": context.trace_id,
"correlation_id": context.correlation_id(),
"request_id": context.request_id(),
}),
);
match analytics.send_event(analytics_event) {
Ok(()) => report.add_pass("analytics.emit", "event accepted"),
Err(err) => report.add_fail("analytics.emit", err.to_string()),
}
let context_correlation_check =
context.correlation_id().is_some() && context.request_id().is_some();
if context_correlation_check {
report.add_pass(
"context.correlation",
"correlation_id and request_id present",
);
} else {
report.add_fail(
"context.correlation",
"missing correlation_id/request_id; set QueryContext::with_correlation_id + with_request_id",
);
}
let retry_override_context = context.clone().with_retry_policy(RetryPolicy {
max_attempts: 2,
initial_backoff_ms: 0,
max_backoff_ms: 0,
});
let mut retry_attempts = 0u32;
match run_with_contract(
"conformance",
"retry_override_probe",
AdapterCallContract::default_query(),
&retry_override_context,
|attempt| {
retry_attempts = attempt;
if attempt == 1 {
Err(IntegrationError::new(
"conformance",
IntegrationErrorKind::Transient,
"transient probe failure",
))
} else {
Ok("ok")
}
},
) {
Ok(_) if retry_attempts == 2 => report.add_pass(
"contract.retry_override",
"retry override honored with bounded attempts",
),
Ok(_) => report.add_fail(
"contract.retry_override",
format!("unexpected attempts={retry_attempts} expected=2"),
),
Err(err) => report.add_fail("contract.retry_override", err.to_string()),
}
let timeout_context = context.clone().with_timeout_ms(1);
match run_with_contract(
"conformance",
"timeout_probe",
AdapterCallContract::default_query(),
&timeout_context,
|_| {
thread::sleep(Duration::from_millis(5));
Ok::<_, IntegrationError>("done")
},
) {
Err(err) if err.kind == IntegrationErrorKind::Timeout => {
report.add_pass("contract.timeout", "timeout classification validated")
}
Err(err) => report.add_fail("contract.timeout", err.to_string()),
Ok(_) => report.add_fail(
"contract.timeout",
"expected timeout but operation returned success",
),
}
report
}
pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
DataError::Integration(format!("[{}] {}", source.into(), err))
}
pub fn map_integration_result<T>(
source: impl Into<String>,
result: IntegrationResult<T>,
) -> DataResult<T> {
result.map_err(|err| map_integration_error(source, err))
}
pub fn query_from_search(request: &SearchRequest) -> Query {
let mut query = Query::new().paginate(request.page, request.per_page);
for filter in &request.filters {
query = query.where_filter(filter.clone());
}
query
}
fn materialize_window_response(
source: &str,
request: DataWindowRequest,
rows: Vec<Row>,
total_rows_hint: Option<usize>,
) -> DataWindowResponse {
let total_rows = total_rows_hint.unwrap_or(rows.len());
let offset = request.offset.min(total_rows);
let limit = request.limit.max(1);
let window_rows = rows
.into_iter()
.skip(offset)
.take(limit)
.collect::<Vec<_>>();
let query_token = request
.query_token
.unwrap_or_else(|| format!("{source}:{}:{}", request.dataset, total_rows));
let window_token = request.window_token.unwrap_or_else(|| {
format!(
"{query_token}:offset={offset}:limit={limit}:rows={}",
window_rows.len()
)
});
let compact_rows = (request.wire_format == WireFormatProfile::Compact)
.then(|| encode_compact_rows(&window_rows));
DataWindowResponse {
dataset: request.dataset,
offset,
limit,
total_rows,
query_token,
window_token,
rows: window_rows,
compact_rows,
}
}
fn encode_compact_rows(rows: &[Row]) -> CompactRowsPayload {
let mut columns: Vec<String> = Vec::new();
for row in rows {
for key in row.keys() {
if !columns.iter().any(|column| column == key) {
columns.push(key.clone());
}
}
}
let encoded_rows = rows
.iter()
.map(|row| {
columns
.iter()
.map(|column| row.get(column).cloned().unwrap_or(Value::Null))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
CompactRowsPayload {
columns,
rows: encoded_rows,
}
}
#[cfg(test)]
mod tests {
use super::{
map_integration_result, run_adapter_conformance_suite, run_with_contract,
run_with_contract_async, run_with_retry, run_with_retry_async, AdapterCallContract,
AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge, BigQueryAdapter, ClickHouseAdapter,
ConnectionLifecycle, ConnectionLifecycleHook, DataWindowRequest, InMemoryAxiomSink,
InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryJobOrchestrator,
InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
SearchResponse, SingleStoreAdapter, SqlCommand, TriggerDevAdapter, TypedQueryBoundary,
CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID, CONTEXT_TAG_TIMEOUT_MS,
};
use serde_json::{json, Value};
use std::sync::{Arc, Mutex};
struct CountingHook {
connects: Arc<Mutex<u32>>,
disconnects: Arc<Mutex<u32>>,
}
impl ConnectionLifecycleHook for CountingHook {
fn on_connect(&self) -> super::IntegrationResult<()> {
let mut guard = self.connects.lock().unwrap();
*guard += 1;
Ok(())
}
fn on_disconnect(&self) -> super::IntegrationResult<()> {
let mut guard = self.disconnects.lock().unwrap();
*guard += 1;
Ok(())
}
}
#[test]
fn lifecycle_hooks_are_called() {
let connects = Arc::new(Mutex::new(0));
let disconnects = Arc::new(Mutex::new(0));
let mut lifecycle = LifecycleHooks::new();
lifecycle.register_hook(Arc::new(CountingHook {
connects: connects.clone(),
disconnects: disconnects.clone(),
}));
lifecycle.connect().unwrap();
lifecycle.disconnect().unwrap();
assert_eq!(*connects.lock().unwrap(), 1);
assert_eq!(*disconnects.lock().unwrap(), 1);
}
#[test]
fn retry_policy_retries_transient_errors() {
let mut calls = 0u32;
let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
calls = attempt;
if attempt < 3 {
Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::Transient,
"temporary failure",
))
} else {
Ok("ok")
}
})
.unwrap();
assert_eq!(result, "ok");
assert_eq!(calls, 3);
}
#[test]
fn retry_policy_stops_on_permanent_errors() {
let mut calls = 0u32;
let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
calls = attempt;
Err::<(), IntegrationError>(IntegrationError::new(
"singlestore",
IntegrationErrorKind::Permanent,
"invalid sql",
))
})
.unwrap_err();
assert_eq!(calls, 1);
assert_eq!(err.kind, IntegrationErrorKind::Permanent);
}
#[test]
fn integration_result_maps_into_data_error() {
let mapped = map_integration_result::<()>(
"trigger",
Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"service unavailable",
)),
)
.unwrap_err();
assert!(mapped.to_string().contains("service unavailable"));
}
#[derive(Default)]
struct InMemoryJobs {
statuses: Arc<Mutex<Vec<JobStatus>>>,
callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
}
impl JobOrchestrator for InMemoryJobs {
fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
let id = format!("job-{}", request.idempotency_key);
self.statuses.lock().unwrap().push(JobStatus {
id: id.clone(),
state: JobState::Queued,
attempts: 0,
result: None,
error: None,
});
Ok(JobHandle {
id,
workflow: request.workflow,
idempotency_key: request.idempotency_key,
})
}
fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
self.statuses
.lock()
.unwrap()
.iter()
.find(|status| status.id == id)
.cloned()
.ok_or_else(|| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
})
}
fn poll(
&self,
id: &str,
_attempts: u32,
_backoff_ms: u64,
) -> super::IntegrationResult<JobStatus> {
self.status(id)
}
fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
self.callbacks.lock().unwrap().push(callback);
}
}
#[test]
fn job_orchestration_contract_supports_enqueue_and_status() {
let jobs = InMemoryJobs::default();
let handle = jobs
.enqueue(JobRequest::new(
"sync_customer",
json!({"id": 42}),
"idempotent-42",
))
.unwrap();
let status = jobs.status(&handle.id).unwrap();
assert_eq!(status.state, JobState::Queued);
assert_eq!(handle.idempotency_key, "idempotent-42");
let ctx = QueryContext::default().with_tenant_id("tenant-a");
assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
}
#[test]
fn reference_singlestore_adapter_runs_typed_sql_boundary() {
let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
"region".to_string(),
Value::String("EMEA".to_string()),
)])]);
let rows = adapter
.run_query(
SqlCommand::new("SELECT region FROM accounts", Vec::new()),
&QueryContext::default(),
)
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0].get("region"),
Some(&Value::String("EMEA".to_string()))
);
}
#[test]
fn reference_clickhouse_adapter_runs_typed_sql_boundary() {
let adapter = InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
"events".to_string(),
Value::Number(128.into()),
)])]);
let rows = adapter
.run_query(
SqlCommand::new("SELECT events FROM account_events", Vec::new()),
&QueryContext::default(),
)
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get("events"), Some(&Value::Number(128.into())));
}
#[test]
fn reference_bigquery_adapter_runs_typed_sql_boundary() {
let adapter = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
"active_accounts".to_string(),
Value::Number(64.into()),
)])]);
let rows = adapter
.run_query(
SqlCommand::new("SELECT active_accounts FROM analytics_rollup", Vec::new()),
&QueryContext::default(),
)
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0].get("active_accounts"),
Some(&Value::Number(64.into()))
);
}
#[test]
fn reference_opensearch_adapter_filters_rows() {
let rows = vec![
crate::StoredRow {
id: 1,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Acme renewal".to_string()),
)]),
},
crate::StoredRow {
id: 2,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Globex onboarding".to_string()),
)]),
},
];
let adapter = InMemoryOpenSearchAdapter::new(rows);
let response = adapter
.search(
SearchRequest::new("accounts", "renewal"),
&QueryContext::default(),
)
.unwrap();
assert_eq!(response.total_hits, 1);
assert_eq!(response.rows[0].id, 1);
}
#[test]
fn high_volume_window_query_profiles_support_compact_payloads() {
let sql_rows = vec![
std::collections::BTreeMap::from([
("tenant".to_string(), Value::String("north".to_string())),
("arr".to_string(), Value::Number(120.into())),
]),
std::collections::BTreeMap::from([
("tenant".to_string(), Value::String("south".to_string())),
("arr".to_string(), Value::Number(210.into())),
]),
std::collections::BTreeMap::from([
("tenant".to_string(), Value::String("west".to_string())),
("arr".to_string(), Value::Number(330.into())),
]),
std::collections::BTreeMap::from([
("tenant".to_string(), Value::String("east".to_string())),
("arr".to_string(), Value::Number(480.into())),
]),
];
let context = QueryContext::default();
let request = DataWindowRequest::new("tenant_rollup", 1, 2)
.with_query_token("q:m51")
.compact();
let singlestore = InMemorySingleStoreAdapter::new(sql_rows.clone());
let clickhouse = InMemoryClickHouseAdapter::new(sql_rows.clone());
let bigquery = InMemoryBigQueryAdapter::new(sql_rows.clone());
for response in [
singlestore
.run_high_volume_window_query(
SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
request.clone(),
&context,
)
.expect("singlestore window query"),
clickhouse
.run_high_volume_window_query(
SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
request.clone(),
&context,
)
.expect("clickhouse window query"),
bigquery
.run_high_volume_window_query(
SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
request.clone(),
&context,
)
.expect("bigquery window query"),
] {
assert_eq!(response.dataset, "tenant_rollup");
assert_eq!(response.rows.len(), 2);
assert_eq!(response.total_rows, 4);
assert_eq!(response.query_token, "q:m51");
let compact = response.compact_rows.expect("compact payload");
assert!(compact.columns.contains(&"tenant".to_string()));
assert!(compact.columns.contains(&"arr".to_string()));
assert_eq!(compact.rows.len(), response.rows.len());
}
}
#[test]
fn opensearch_window_search_uses_window_contract() {
let rows = vec![
crate::StoredRow {
id: 1,
data: std::collections::BTreeMap::from([
("tenant".to_string(), Value::String("acme".to_string())),
("status".to_string(), Value::String("healthy".to_string())),
]),
},
crate::StoredRow {
id: 2,
data: std::collections::BTreeMap::from([
("tenant".to_string(), Value::String("globex".to_string())),
("status".to_string(), Value::String("renewal".to_string())),
]),
},
crate::StoredRow {
id: 3,
data: std::collections::BTreeMap::from([
("tenant".to_string(), Value::String("initech".to_string())),
("status".to_string(), Value::String("at-risk".to_string())),
]),
},
];
let adapter = InMemoryOpenSearchAdapter::new(rows);
let response = adapter
.search_window(
SearchRequest::new("accounts", ""),
DataWindowRequest::new("accounts", 1, 2).compact(),
&QueryContext::default(),
)
.expect("opensearch window search");
assert_eq!(response.dataset, "accounts");
assert_eq!(response.offset, 1);
assert_eq!(response.limit, 2);
assert_eq!(response.total_rows, 3);
assert_eq!(response.rows.len(), 2);
assert!(response.compact_rows.is_some());
assert!(!response.window_token.is_empty());
}
#[test]
fn reference_axiom_sink_records_events() {
let sink = InMemoryAxiomSink::default();
sink.send_event(AnalyticsEvent::new(
"sales",
"query_executed",
json!({"latency_ms": 12}),
))
.unwrap();
let events = sink.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].name, "query_executed");
}
#[test]
fn reference_trigger_orchestrator_supports_completion_and_polling() {
let orchestrator = InMemoryJobOrchestrator::default();
let completed = Arc::new(Mutex::new(false));
let completed_flag = completed.clone();
orchestrator.register_completion_callback(Arc::new(move |status| {
if status.state == JobState::Succeeded {
if let Ok(mut guard) = completed_flag.lock() {
*guard = true;
}
}
}));
let handle = orchestrator
.enqueue(JobRequest::new(
"refresh_dashboard",
json!({"account_id": 7}),
"refresh-7",
))
.unwrap();
orchestrator
.mark_succeeded(&handle.id, json!({"rows_synced": 18}))
.unwrap();
let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
assert_eq!(status.state, JobState::Succeeded);
assert_eq!(
status.result,
Some(json!({
"rows_synced": 18
}))
);
assert!(*completed.lock().unwrap());
}
#[test]
fn query_context_helpers_encode_trace_correlation_retry_timeout() {
let context = QueryContext::default()
.with_correlation_id("corr-42")
.with_request_id("req-42")
.with_timeout_ms(900)
.with_retry_policy(RetryPolicy {
max_attempts: 4,
initial_backoff_ms: 25,
max_backoff_ms: 100,
});
assert_eq!(
context
.tags
.get(CONTEXT_TAG_CORRELATION_ID)
.map(String::as_str),
Some("corr-42")
);
assert_eq!(
context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
Some("req-42")
);
assert_eq!(
context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
Some("900")
);
assert_eq!(context.correlation_id(), Some("corr-42"));
assert_eq!(context.request_id(), Some("req-42"));
assert_eq!(context.timeout_ms(), Some(900));
assert_eq!(
context.retry_policy_override(),
Some(RetryPolicy {
max_attempts: 4,
initial_backoff_ms: 25,
max_backoff_ms: 100
})
);
}
#[test]
fn run_with_contract_respects_context_retry_override() {
let context = QueryContext::default().with_retry_policy(RetryPolicy {
max_attempts: 2,
initial_backoff_ms: 0,
max_backoff_ms: 0,
});
let mut attempts = 0u32;
let result = run_with_contract(
"opensearch",
"search",
AdapterCallContract::default()
.with_retry_policy(RetryPolicy::never())
.with_timeout_ms(1_000),
&context,
|attempt| {
attempts = attempt;
if attempt == 1 {
Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::Transient,
"temporary failure",
))
} else {
Ok("ok")
}
},
)
.unwrap();
assert_eq!(result, "ok");
assert_eq!(attempts, 2);
}
#[tokio::test]
async fn run_with_contract_async_respects_context_retry_override() {
let context = QueryContext::default().with_retry_policy(RetryPolicy {
max_attempts: 2,
initial_backoff_ms: 0,
max_backoff_ms: 0,
});
let attempts = Arc::new(Mutex::new(0u32));
let attempts_for_closure = attempts.clone();
let result = run_with_contract_async(
"opensearch",
"search_async",
AdapterCallContract::default()
.with_retry_policy(RetryPolicy::never())
.with_timeout_ms(1_000),
&context,
move |attempt| {
let attempts_for_step = attempts_for_closure.clone();
async move {
if let Ok(mut guard) = attempts_for_step.lock() {
*guard = attempt;
}
if attempt == 1 {
Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::Transient,
"temporary failure",
))
} else {
Ok("ok")
}
}
},
)
.await
.unwrap();
assert_eq!(result, "ok");
assert_eq!(*attempts.lock().unwrap(), 2);
}
#[tokio::test]
async fn run_with_retry_async_retries_transient_errors() {
let calls = Arc::new(Mutex::new(0u32));
let calls_for_closure = calls.clone();
let result = run_with_retry_async(RetryPolicy::conservative(), move |attempt| {
let calls_for_step = calls_for_closure.clone();
async move {
if let Ok(mut guard) = calls_for_step.lock() {
*guard = attempt;
}
if attempt < 3 {
Err(IntegrationError::new(
"bigquery",
IntegrationErrorKind::Transient,
"temporary failure",
))
} else {
Ok("ok")
}
}
})
.await
.unwrap();
assert_eq!(result, "ok");
assert_eq!(*calls.lock().unwrap(), 3);
}
#[test]
fn run_with_contract_returns_timeout_error() {
let context = QueryContext::default().with_timeout_ms(5);
let err = run_with_contract(
"singlestore",
"run_query",
AdapterCallContract::default(),
&context,
|_| {
std::thread::sleep(std::time::Duration::from_millis(15));
Ok::<_, IntegrationError>("done")
},
)
.unwrap_err();
assert_eq!(err.kind, IntegrationErrorKind::Timeout);
assert_eq!(err.code.as_deref(), Some("operation_timeout"));
}
#[test]
fn axiom_bridge_enriches_events_with_trace_and_correlation() {
let sink = Arc::new(InMemoryAxiomSink::default());
let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
let context = QueryContext {
tenant_id: Some("tenant-a".to_string()),
trace_id: Some("trace-123".to_string()),
tags: std::collections::BTreeMap::new(),
}
.with_correlation_id("corr-9")
.with_request_id("req-9");
bridge
.emit("session_event", json!({"event":"patch"}), &context)
.unwrap();
let events = sink.events();
assert_eq!(events.len(), 1);
let payload = events[0].payload.as_object().unwrap();
assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
}
#[test]
fn conformance_suite_passes_with_reference_adapters() {
let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
[("tenant".to_string(), Value::String("north".to_string()))],
)]);
let clickhouse =
InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
"events".to_string(),
Value::Number(99.into()),
)])]);
let bigquery = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
"accounts".to_string(),
Value::Number(33.into()),
)])]);
let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
id: 1,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Acme renewal".to_string()),
)]),
}]);
let trigger = InMemoryJobOrchestrator::default();
let analytics = InMemoryAxiomSink::default();
let report = run_adapter_conformance_suite(
&singlestore,
&clickhouse,
&bigquery,
&opensearch,
&trigger,
&analytics,
&QueryContext::default()
.with_correlation_id("corr-1")
.with_request_id("req-1"),
);
assert!(report.passed(), "report={report:?}");
assert!(report.checks.len() >= 8);
}
#[test]
fn adapter_error_paths_cover_invalid_inputs_and_failure_reporting() {
let context = QueryContext::default();
let empty_sql = SqlCommand::new(" ", Vec::new());
let singlestore = InMemorySingleStoreAdapter::default();
let clickhouse = InMemoryClickHouseAdapter::default();
let bigquery = InMemoryBigQueryAdapter::default();
let opensearch = InMemoryOpenSearchAdapter::default();
for err in [
singlestore
.run_query(empty_sql.clone(), &context)
.unwrap_err(),
clickhouse
.run_query(empty_sql.clone(), &context)
.unwrap_err(),
bigquery.run_query(empty_sql, &context).unwrap_err(),
] {
assert_eq!(err.kind, IntegrationErrorKind::InvalidInput);
assert_eq!(err.code.as_deref(), Some("empty_statement"));
}
let search_err = opensearch
.search(SearchRequest::new(" ", "needle"), &context)
.unwrap_err();
assert_eq!(search_err.kind, IntegrationErrorKind::InvalidInput);
assert_eq!(search_err.code.as_deref(), Some("empty_index"));
}
struct FailingSqlAdapter {
source: &'static str,
}
impl TypedQueryBoundary for FailingSqlAdapter {
type Request = SqlCommand;
type Response = Vec<crate::Row>;
fn execute(
&self,
_request: &Self::Request,
_context: &QueryContext,
) -> super::IntegrationResult<Self::Response> {
Err(IntegrationError::new(
self.source,
IntegrationErrorKind::Unavailable,
"backend unavailable",
)
.with_code("backend_down"))
}
}
impl SingleStoreAdapter for FailingSqlAdapter {}
impl ClickHouseAdapter for FailingSqlAdapter {}
impl BigQueryAdapter for FailingSqlAdapter {}
struct FailingSearchAdapter;
impl TypedQueryBoundary for FailingSearchAdapter {
type Request = SearchRequest;
type Response = SearchResponse;
fn execute(
&self,
_request: &Self::Request,
_context: &QueryContext,
) -> super::IntegrationResult<Self::Response> {
Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::Unavailable,
"search unavailable",
))
}
}
impl OpenSearchAdapter for FailingSearchAdapter {}
struct FailingTrigger;
impl TriggerDevAdapter for FailingTrigger {
fn trigger_workflow(&self, _request: JobRequest) -> super::IntegrationResult<JobHandle> {
Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"trigger unavailable",
))
}
fn workflow_status(&self, _id: &str) -> super::IntegrationResult<JobStatus> {
Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"trigger unavailable",
))
}
fn poll_workflow(
&self,
_id: &str,
_attempts: u32,
_backoff_ms: u64,
) -> super::IntegrationResult<JobStatus> {
Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"trigger unavailable",
))
}
}
#[derive(Default)]
struct FailingAnalyticsSink;
impl AnalyticsSink for FailingAnalyticsSink {
fn send_event(&self, _event: AnalyticsEvent) -> super::IntegrationResult<()> {
Err(IntegrationError::new(
"analytics",
IntegrationErrorKind::Unavailable,
"sink unavailable",
))
}
}
#[test]
fn conformance_suite_captures_failures_for_unavailable_dependencies() {
let failing_sql = FailingSqlAdapter { source: "sql" };
let failing_search = FailingSearchAdapter;
let failing_trigger = FailingTrigger;
let failing_analytics = FailingAnalyticsSink;
let report = run_adapter_conformance_suite(
&failing_sql,
&failing_sql,
&failing_sql,
&failing_search,
&failing_trigger,
&failing_analytics,
&QueryContext::default(),
);
assert!(!report.passed());
assert!(report
.checks
.iter()
.any(|check| { check.name == "singlestore.query" && !check.passed }));
assert!(report
.checks
.iter()
.any(|check| { check.name == "opensearch.search" && !check.passed }));
assert!(report
.checks
.iter()
.any(|check| { check.name == "trigger.enqueue" && !check.passed }));
assert!(report
.checks
.iter()
.any(|check| { check.name == "analytics.emit" && !check.passed }));
assert!(report
.checks
.iter()
.any(|check| { check.name == "context.correlation" && !check.passed }));
}
}