use crate::{
error::{DataError, DataResult},
query::Query,
repo::{Row, StoredRow},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{BTreeMap, HashMap},
fmt,
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use tracing::{info, warn};
pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IntegrationErrorKind {
Transient,
Permanent,
Auth,
RateLimited,
Timeout,
Unavailable,
InvalidInput,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct IntegrationError {
pub source: String,
pub kind: IntegrationErrorKind,
pub message: String,
pub code: Option<String>,
pub retryable: bool,
}
impl IntegrationError {
pub fn new(
source: impl Into<String>,
kind: IntegrationErrorKind,
message: impl Into<String>,
) -> Self {
let kind_value = kind;
Self {
source: source.into(),
kind: kind_value,
message: message.into(),
code: None,
retryable: matches!(
kind_value,
IntegrationErrorKind::Transient
| IntegrationErrorKind::RateLimited
| IntegrationErrorKind::Timeout
| IntegrationErrorKind::Unavailable
),
}
}
pub fn with_code(mut self, code: impl Into<String>) -> Self {
self.code = Some(code.into());
self
}
}
impl fmt::Display for IntegrationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(code) = &self.code {
write!(f, "[{}:{}] {}", self.source, code, self.message)
} else {
write!(f, "[{}] {}", self.source, self.message)
}
}
}
impl std::error::Error for IntegrationError {}
pub type IntegrationResult<T> = Result<T, IntegrationError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
}
impl RetryPolicy {
pub fn conservative() -> Self {
Self {
max_attempts: 3,
initial_backoff_ms: 50,
max_backoff_ms: 500,
}
}
pub fn never() -> Self {
Self {
max_attempts: 1,
initial_backoff_ms: 0,
max_backoff_ms: 0,
}
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self::conservative()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct AdapterCallContract {
pub retry_policy: RetryPolicy,
pub timeout_ms: u64,
}
impl AdapterCallContract {
pub fn low_latency() -> Self {
Self {
retry_policy: RetryPolicy::never(),
timeout_ms: 250,
}
}
pub fn default_query() -> Self {
Self {
retry_policy: RetryPolicy::conservative(),
timeout_ms: 1_500,
}
}
pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
self.retry_policy = retry_policy;
self
}
pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms.max(1);
self
}
}
impl Default for AdapterCallContract {
fn default() -> Self {
Self::default_query()
}
}
pub fn run_with_contract<T, F>(
source: &str,
operation: &str,
contract: AdapterCallContract,
context: &QueryContext,
mut operation_fn: F,
) -> IntegrationResult<T>
where
F: FnMut(u32) -> IntegrationResult<T>,
{
let policy = context
.retry_policy_override()
.unwrap_or(contract.retry_policy);
let attempts = policy.max_attempts.max(1);
let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
let mut backoff_ms = policy.initial_backoff_ms;
for attempt in 1..=attempts {
let started_at = Instant::now();
let result = operation_fn(attempt);
let elapsed_ms = started_at.elapsed().as_millis() as u64;
if elapsed_ms > timeout_ms {
return Err(IntegrationError::new(
source,
IntegrationErrorKind::Timeout,
format!(
"{operation} exceeded timeout: elapsed={}ms timeout={}ms",
elapsed_ms, timeout_ms
),
)
.with_code("operation_timeout"));
}
match result {
Ok(value) => return Ok(value),
Err(err) if !err.retryable || attempt == attempts => return Err(err),
Err(err) => {
warn!(
target: "shelly.integration.query",
source,
operation,
attempt,
max_attempts = attempts,
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
correlation_id = context.correlation_id().unwrap_or("-"),
request_id = context.request_id().unwrap_or("-"),
timeout_ms,
backoff_ms,
error = %err,
"Shelly integration retrying transient failure"
);
if backoff_ms > 0 {
thread::sleep(Duration::from_millis(backoff_ms));
}
backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
}
}
}
Err(IntegrationError::new(
source,
IntegrationErrorKind::Transient,
format!("{operation} retry exhausted"),
)
.with_code("retry_exhausted"))
}
pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
where
F: FnMut(u32) -> IntegrationResult<T>,
{
let attempts = policy.max_attempts.max(1);
let mut backoff_ms = policy.initial_backoff_ms;
for attempt in 1..=attempts {
match operation(attempt) {
Ok(value) => return Ok(value),
Err(err) if !err.retryable || attempt == attempts => return Err(err),
Err(_) => {
if backoff_ms > 0 {
thread::sleep(Duration::from_millis(backoff_ms));
}
backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
}
}
}
Err(IntegrationError::new(
"retry",
IntegrationErrorKind::Transient,
"retry exhausted",
))
}
pub trait ConnectionLifecycleHook: Send + Sync {
fn on_connect(&self) -> IntegrationResult<()> {
Ok(())
}
fn on_disconnect(&self) -> IntegrationResult<()> {
Ok(())
}
}
pub trait ConnectionLifecycle {
fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
fn connect(&self) -> IntegrationResult<()>;
fn disconnect(&self) -> IntegrationResult<()>;
}
#[derive(Default)]
pub struct LifecycleHooks {
hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
}
impl LifecycleHooks {
pub fn new() -> Self {
Self::default()
}
}
impl ConnectionLifecycle for LifecycleHooks {
fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
self.hooks.push(hook);
}
fn connect(&self) -> IntegrationResult<()> {
for hook in &self.hooks {
hook.on_connect()?;
}
Ok(())
}
fn disconnect(&self) -> IntegrationResult<()> {
for hook in &self.hooks {
hook.on_disconnect()?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct QueryContext {
pub tenant_id: Option<String>,
pub trace_id: Option<String>,
pub tags: BTreeMap<String, String>,
}
impl QueryContext {
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
}
pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
}
pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
}
pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
self.with_tag(
CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
policy.max_attempts.to_string(),
)
.with_tag(
CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
policy.initial_backoff_ms.to_string(),
)
.with_tag(
CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
policy.max_backoff_ms.to_string(),
)
}
pub fn correlation_id(&self) -> Option<&str> {
self.tags
.get(CONTEXT_TAG_CORRELATION_ID)
.map(String::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
}
pub fn request_id(&self) -> Option<&str> {
self.tags
.get(CONTEXT_TAG_REQUEST_ID)
.map(String::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
}
pub fn timeout_ms(&self) -> Option<u64> {
self.tags
.get(CONTEXT_TAG_TIMEOUT_MS)
.and_then(|value| value.parse::<u64>().ok())
.filter(|value| *value > 0)
}
pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
let max_attempts = self
.tags
.get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
.and_then(|value| value.parse::<u32>().ok())?;
let initial_backoff_ms = self
.tags
.get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0);
let max_backoff_ms = self
.tags
.get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(initial_backoff_ms);
Some(RetryPolicy {
max_attempts: max_attempts.max(1),
initial_backoff_ms,
max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
})
}
}
pub trait TypedQueryBoundary {
type Request: Send + Sync;
type Response: Send + Sync;
fn execute(
&self,
request: &Self::Request,
context: &QueryContext,
) -> IntegrationResult<Self::Response>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SqlCommand {
pub statement: String,
pub params: Vec<Value>,
}
impl SqlCommand {
pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
Self {
statement: statement.into(),
params,
}
}
}
pub trait SingleStoreAdapter:
TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
{
fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
let started_at = Instant::now();
let statement = query.statement.as_str();
let param_count = query.params.len();
let result = self.execute(&query, context);
match &result {
Ok(rows) => info!(
target: "shelly.integration.query",
source = "singlestore",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
statement,
param_count,
row_count = rows.len(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "singlestore",
operation = "run_query",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
statement,
param_count,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SearchRequest {
pub index: String,
pub text: String,
pub filters: Vec<crate::query::Filter>,
pub page: usize,
pub per_page: usize,
}
impl SearchRequest {
pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
Self {
index: index.into(),
text: text.into(),
filters: Vec::new(),
page: 1,
per_page: 25,
}
}
pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
self.filters.push(filter);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct SearchResponse {
pub total_hits: usize,
pub rows: Vec<StoredRow>,
}
pub trait OpenSearchAdapter:
TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
{
fn search(
&self,
request: SearchRequest,
context: &QueryContext,
) -> IntegrationResult<SearchResponse> {
let started_at = Instant::now();
let index = request.index.as_str();
let text = request.text.as_str();
let filter_count = request.filters.len();
let page = request.page;
let per_page = request.per_page;
let result = self.execute(&request, context);
match &result {
Ok(response) => info!(
target: "shelly.integration.query",
source = "opensearch",
operation = "search",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
index,
text,
filter_count,
page,
per_page,
row_count = response.rows.len(),
total_hits = response.total_hits,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration query executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "opensearch",
operation = "search",
tenant_id = ?context.tenant_id,
trace_id = ?context.trace_id,
tag_count = context.tags.len(),
index,
text,
filter_count,
page,
per_page,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration query failed"
),
}
result
}
}
pub trait AnalyticsSink: Send + Sync {
fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AnalyticsEvent {
pub namespace: String,
pub name: String,
pub payload: Value,
}
impl AnalyticsEvent {
pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
Self {
namespace: namespace.into(),
name: name.into(),
payload,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobRequest {
pub workflow: String,
pub payload: Value,
pub idempotency_key: String,
pub metadata: BTreeMap<String, String>,
}
impl JobRequest {
pub fn new(
workflow: impl Into<String>,
payload: Value,
idempotency_key: impl Into<String>,
) -> Self {
Self {
workflow: workflow.into(),
payload,
idempotency_key: idempotency_key.into(),
metadata: BTreeMap::new(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JobState {
Queued,
Running,
Succeeded,
Failed,
Canceled,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobStatus {
pub id: String,
pub state: JobState,
pub attempts: u32,
pub result: Option<Value>,
pub error: Option<IntegrationError>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct JobHandle {
pub id: String,
pub workflow: String,
pub idempotency_key: String,
}
pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
pub trait JobOrchestrator: Send + Sync {
fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
fn register_completion_callback(&self, callback: JobCompletionCallback);
}
pub trait TriggerDevAdapter: Send + Sync {
fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
fn poll_workflow(
&self,
id: &str,
attempts: u32,
backoff_ms: u64,
) -> IntegrationResult<JobStatus>;
}
impl<T> TriggerDevAdapter for T
where
T: JobOrchestrator + Send + Sync,
{
fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
self.enqueue(request)
}
fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
self.status(id)
}
fn poll_workflow(
&self,
id: &str,
attempts: u32,
backoff_ms: u64,
) -> IntegrationResult<JobStatus> {
self.poll(id, attempts, backoff_ms)
}
}
#[derive(Clone)]
pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
sink: Arc<S>,
namespace: String,
}
impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
Self {
sink,
namespace: namespace.into(),
}
}
pub fn emit(
&self,
name: impl Into<String>,
payload: Value,
context: &QueryContext,
) -> IntegrationResult<()> {
let mut envelope = serde_json::Map::new();
envelope.insert("payload".to_string(), payload);
envelope.insert(
"tenant_id".to_string(),
context
.tenant_id
.as_ref()
.map(|value| Value::String(value.clone()))
.unwrap_or(Value::Null),
);
envelope.insert(
"trace_id".to_string(),
context
.trace_id
.as_ref()
.map(|value| Value::String(value.clone()))
.unwrap_or(Value::Null),
);
envelope.insert(
"correlation_id".to_string(),
context
.correlation_id()
.map(|value| Value::String(value.to_string()))
.unwrap_or(Value::Null),
);
envelope.insert(
"request_id".to_string(),
context
.request_id()
.map(|value| Value::String(value.to_string()))
.unwrap_or(Value::Null),
);
envelope.insert("tags".to_string(), serde_json::json!(context.tags));
self.sink.send_event(AnalyticsEvent::new(
self.namespace.clone(),
name.into(),
Value::Object(envelope),
))
}
}
#[derive(Debug, Clone, Default)]
pub struct InMemorySingleStoreAdapter {
rows: Vec<Row>,
}
impl InMemorySingleStoreAdapter {
pub fn new(rows: Vec<Row>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemorySingleStoreAdapter {
type Request = SqlCommand;
type Response = Vec<Row>;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.statement.trim().is_empty() {
return Err(IntegrationError::new(
"singlestore",
IntegrationErrorKind::InvalidInput,
"empty SQL statement",
)
.with_code("empty_statement"));
}
Ok(self.rows.clone())
}
}
impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
#[derive(Debug, Clone, Default)]
pub struct InMemoryOpenSearchAdapter {
rows: Vec<StoredRow>,
}
impl InMemoryOpenSearchAdapter {
pub fn new(rows: Vec<StoredRow>) -> Self {
Self { rows }
}
}
impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
type Request = SearchRequest;
type Response = SearchResponse;
fn execute(
&self,
request: &Self::Request,
_context: &QueryContext,
) -> IntegrationResult<Self::Response> {
if request.index.trim().is_empty() {
return Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::InvalidInput,
"search index must not be empty",
)
.with_code("empty_index"));
}
let needle = request.text.trim().to_lowercase();
let mut filtered = self
.rows
.iter()
.filter(|row| {
if needle.is_empty() {
return true;
}
row.data.values().any(|value| {
value
.as_str()
.map(|text| text.to_lowercase().contains(&needle))
.unwrap_or(false)
})
})
.cloned()
.collect::<Vec<_>>();
let total_hits = filtered.len();
let page = request.page.max(1);
let per_page = request.per_page.max(1);
let offset = (page - 1) * per_page;
filtered = filtered.into_iter().skip(offset).take(per_page).collect();
Ok(SearchResponse {
total_hits,
rows: filtered,
})
}
}
impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
#[derive(Debug, Default, Clone)]
pub struct InMemoryAxiomSink {
events: Arc<Mutex<Vec<AnalyticsEvent>>>,
}
impl InMemoryAxiomSink {
pub fn events(&self) -> Vec<AnalyticsEvent> {
self.events
.lock()
.map(|events| events.clone())
.unwrap_or_default()
}
}
impl AnalyticsSink for InMemoryAxiomSink {
fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
let started_at = Instant::now();
let namespace = event.namespace.clone();
let name = event.name.clone();
self.events
.lock()
.map_err(|_| {
IntegrationError::new(
"axiom",
IntegrationErrorKind::Unavailable,
"analytics sink lock poisoned",
)
})?
.push(event);
info!(
target: "shelly.integration.query",
source = "axiom",
operation = "send_event",
namespace,
event_name = name,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
Ok(())
}
}
#[derive(Default)]
pub struct InMemoryJobOrchestrator {
statuses: Mutex<HashMap<String, JobStatus>>,
callbacks: Mutex<Vec<JobCompletionCallback>>,
next_id: Mutex<u64>,
}
impl InMemoryJobOrchestrator {
pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
let status = self.with_status_mut(id, |status| {
status.state = JobState::Succeeded;
status.result = Some(result);
status.error = None;
})?;
self.notify(&status);
Ok(())
}
pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
let status = self.with_status_mut(id, |status| {
status.state = JobState::Failed;
status.result = None;
status.error = Some(error);
})?;
self.notify(&status);
Ok(())
}
fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
where
F: FnOnce(&mut JobStatus),
{
let mut statuses = self.statuses.lock().map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?;
let Some(status) = statuses.get_mut(id) else {
return Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
.with_code("job_not_found"));
};
status.attempts = status.attempts.saturating_add(1);
update(status);
Ok(status.clone())
}
fn notify(&self, status: &JobStatus) {
if let Ok(callbacks) = self.callbacks.lock() {
for callback in callbacks.iter() {
callback(status);
}
}
}
fn is_terminal(state: JobState) -> bool {
matches!(
state,
JobState::Succeeded | JobState::Failed | JobState::Canceled
)
}
}
impl JobOrchestrator for InMemoryJobOrchestrator {
fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
let started_at = Instant::now();
let workflow = request.workflow.clone();
let idempotency_key = request.idempotency_key.clone();
let mut next_id = self.next_id.lock().map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job id lock poisoned",
)
})?;
*next_id = next_id.saturating_add(1);
let id = format!("job-{next_id}");
let status = JobStatus {
id: id.clone(),
state: JobState::Queued,
attempts: 0,
result: None,
error: None,
};
self.statuses
.lock()
.map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?
.insert(id.clone(), status);
let handle = JobHandle {
id,
workflow: request.workflow,
idempotency_key: request.idempotency_key,
};
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "enqueue",
workflow,
idempotency_key,
job_id = handle.id.as_str(),
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
Ok(handle)
}
fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
let started_at = Instant::now();
let result = self
.statuses
.lock()
.map_err(|_| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"job status lock poisoned",
)
})?
.get(id)
.cloned()
.ok_or_else(|| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
.with_code("job_not_found")
});
match &result {
Ok(status) => info!(
target: "shelly.integration.query",
source = "trigger",
operation = "status",
job_id = id,
job_state = ?status.state,
attempts = status.attempts,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "status",
job_id = id,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration call failed"
),
}
result
}
fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
let started_at = Instant::now();
let attempts = attempts.max(1);
let mut polled = 0u32;
for current in 1..=attempts {
let status = self.status(id)?;
polled = current;
if Self::is_terminal(status.state) || current == attempts {
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
terminal = Self::is_terminal(status.state),
job_state = ?status.state,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
);
return Ok(status);
}
if backoff_ms > 0 {
thread::sleep(Duration::from_millis(backoff_ms));
}
}
let result = self.status(id);
match &result {
Ok(status) => info!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
terminal = Self::is_terminal(status.state),
job_state = ?status.state,
duration_ms = started_at.elapsed().as_millis() as u64,
"Shelly integration call executed"
),
Err(err) => warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "poll",
job_id = id,
attempts = polled,
backoff_ms,
duration_ms = started_at.elapsed().as_millis() as u64,
error = %err,
"Shelly integration call failed"
),
}
result
}
fn register_completion_callback(&self, callback: JobCompletionCallback) {
if let Ok(mut callbacks) = self.callbacks.lock() {
callbacks.push(callback);
info!(
target: "shelly.integration.query",
source = "trigger",
operation = "register_completion_callback",
callback_count = callbacks.len(),
"Shelly integration callback registered"
);
} else {
warn!(
target: "shelly.integration.query",
source = "trigger",
operation = "register_completion_callback",
"Shelly integration callback registration failed"
);
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AdapterConformanceCheck {
pub name: String,
pub passed: bool,
pub detail: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct AdapterConformanceReport {
pub checks: Vec<AdapterConformanceCheck>,
}
impl AdapterConformanceReport {
pub fn passed(&self) -> bool {
self.checks.iter().all(|check| check.passed)
}
pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
self.checks.push(AdapterConformanceCheck {
name: name.into(),
passed: true,
detail: detail.into(),
});
}
pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
self.checks.push(AdapterConformanceCheck {
name: name.into(),
passed: false,
detail: detail.into(),
});
}
}
pub fn run_adapter_conformance_suite(
singlestore: &dyn SingleStoreAdapter,
opensearch: &dyn OpenSearchAdapter,
trigger: &dyn TriggerDevAdapter,
analytics: &dyn AnalyticsSink,
context: &QueryContext,
) -> AdapterConformanceReport {
let mut report = AdapterConformanceReport::default();
match singlestore.run_query(
SqlCommand::new(
"SELECT tenant, active_accounts FROM tenant_rollup",
Vec::new(),
),
context,
) {
Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
Err(err) => report.add_fail("singlestore.query", err.to_string()),
}
match opensearch.search(SearchRequest::new("accounts", ""), context) {
Ok(response) => report.add_pass(
"opensearch.search",
format!(
"rows={} total_hits={}",
response.rows.len(),
response.total_hits
),
),
Err(err) => report.add_fail("opensearch.search", err.to_string()),
}
match trigger.trigger_workflow(JobRequest::new(
"adapter_conformance_probe",
serde_json::json!({"probe": true}),
"adapter-conformance-probe",
)) {
Ok(handle) => {
report.add_pass("trigger.enqueue", format!("id={}", handle.id));
match trigger.workflow_status(&handle.id) {
Ok(status) => report.add_pass(
"trigger.status",
format!("state={:?} attempts={}", status.state, status.attempts),
),
Err(err) => report.add_fail("trigger.status", err.to_string()),
}
}
Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
}
let analytics_event = AnalyticsEvent::new(
"adapter_conformance",
"probe",
serde_json::json!({
"trace_id": context.trace_id,
"correlation_id": context.correlation_id(),
"request_id": context.request_id(),
}),
);
match analytics.send_event(analytics_event) {
Ok(()) => report.add_pass("analytics.emit", "event accepted"),
Err(err) => report.add_fail("analytics.emit", err.to_string()),
}
report
}
pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
DataError::Integration(format!("[{}] {}", source.into(), err))
}
pub fn map_integration_result<T>(
source: impl Into<String>,
result: IntegrationResult<T>,
) -> DataResult<T> {
result.map_err(|err| map_integration_error(source, err))
}
pub fn query_from_search(request: &SearchRequest) -> Query {
let mut query = Query::new().paginate(request.page, request.per_page);
for filter in &request.filters {
query = query.where_filter(filter.clone());
}
query
}
#[cfg(test)]
mod tests {
use super::{
map_integration_result, run_adapter_conformance_suite, run_with_contract, run_with_retry,
AdapterCallContract, AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge,
ConnectionLifecycle, ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
SingleStoreAdapter, SqlCommand, CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID,
CONTEXT_TAG_TIMEOUT_MS,
};
use serde_json::{json, Value};
use std::sync::{Arc, Mutex};
struct CountingHook {
connects: Arc<Mutex<u32>>,
disconnects: Arc<Mutex<u32>>,
}
impl ConnectionLifecycleHook for CountingHook {
fn on_connect(&self) -> super::IntegrationResult<()> {
let mut guard = self.connects.lock().unwrap();
*guard += 1;
Ok(())
}
fn on_disconnect(&self) -> super::IntegrationResult<()> {
let mut guard = self.disconnects.lock().unwrap();
*guard += 1;
Ok(())
}
}
#[test]
fn lifecycle_hooks_are_called() {
let connects = Arc::new(Mutex::new(0));
let disconnects = Arc::new(Mutex::new(0));
let mut lifecycle = LifecycleHooks::new();
lifecycle.register_hook(Arc::new(CountingHook {
connects: connects.clone(),
disconnects: disconnects.clone(),
}));
lifecycle.connect().unwrap();
lifecycle.disconnect().unwrap();
assert_eq!(*connects.lock().unwrap(), 1);
assert_eq!(*disconnects.lock().unwrap(), 1);
}
#[test]
fn retry_policy_retries_transient_errors() {
let mut calls = 0u32;
let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
calls = attempt;
if attempt < 3 {
Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::Transient,
"temporary failure",
))
} else {
Ok("ok")
}
})
.unwrap();
assert_eq!(result, "ok");
assert_eq!(calls, 3);
}
#[test]
fn retry_policy_stops_on_permanent_errors() {
let mut calls = 0u32;
let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
calls = attempt;
Err::<(), IntegrationError>(IntegrationError::new(
"singlestore",
IntegrationErrorKind::Permanent,
"invalid sql",
))
})
.unwrap_err();
assert_eq!(calls, 1);
assert_eq!(err.kind, IntegrationErrorKind::Permanent);
}
#[test]
fn integration_result_maps_into_data_error() {
let mapped = map_integration_result::<()>(
"trigger",
Err(IntegrationError::new(
"trigger",
IntegrationErrorKind::Unavailable,
"service unavailable",
)),
)
.unwrap_err();
assert!(mapped.to_string().contains("service unavailable"));
}
#[derive(Default)]
struct InMemoryJobs {
statuses: Arc<Mutex<Vec<JobStatus>>>,
callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
}
impl JobOrchestrator for InMemoryJobs {
fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
let id = format!("job-{}", request.idempotency_key);
self.statuses.lock().unwrap().push(JobStatus {
id: id.clone(),
state: JobState::Queued,
attempts: 0,
result: None,
error: None,
});
Ok(JobHandle {
id,
workflow: request.workflow,
idempotency_key: request.idempotency_key,
})
}
fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
self.statuses
.lock()
.unwrap()
.iter()
.find(|status| status.id == id)
.cloned()
.ok_or_else(|| {
IntegrationError::new(
"trigger",
IntegrationErrorKind::InvalidInput,
"job not found",
)
})
}
fn poll(
&self,
id: &str,
_attempts: u32,
_backoff_ms: u64,
) -> super::IntegrationResult<JobStatus> {
self.status(id)
}
fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
self.callbacks.lock().unwrap().push(callback);
}
}
#[test]
fn job_orchestration_contract_supports_enqueue_and_status() {
let jobs = InMemoryJobs::default();
let handle = jobs
.enqueue(JobRequest::new(
"sync_customer",
json!({"id": 42}),
"idempotent-42",
))
.unwrap();
let status = jobs.status(&handle.id).unwrap();
assert_eq!(status.state, JobState::Queued);
assert_eq!(handle.idempotency_key, "idempotent-42");
let ctx = QueryContext {
tenant_id: Some("tenant-a".to_string()),
..QueryContext::default()
};
assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
}
#[test]
fn reference_singlestore_adapter_runs_typed_sql_boundary() {
let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
"region".to_string(),
Value::String("EMEA".to_string()),
)])]);
let rows = adapter
.run_query(
SqlCommand::new("SELECT region FROM accounts", Vec::new()),
&QueryContext::default(),
)
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0].get("region"),
Some(&Value::String("EMEA".to_string()))
);
}
#[test]
fn reference_opensearch_adapter_filters_rows() {
let rows = vec![
crate::StoredRow {
id: 1,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Acme renewal".to_string()),
)]),
},
crate::StoredRow {
id: 2,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Globex onboarding".to_string()),
)]),
},
];
let adapter = InMemoryOpenSearchAdapter::new(rows);
let response = adapter
.search(
SearchRequest::new("accounts", "renewal"),
&QueryContext::default(),
)
.unwrap();
assert_eq!(response.total_hits, 1);
assert_eq!(response.rows[0].id, 1);
}
#[test]
fn reference_axiom_sink_records_events() {
let sink = InMemoryAxiomSink::default();
sink.send_event(AnalyticsEvent::new(
"sales",
"query_executed",
json!({"latency_ms": 12}),
))
.unwrap();
let events = sink.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].name, "query_executed");
}
#[test]
fn reference_trigger_orchestrator_supports_completion_and_polling() {
let orchestrator = InMemoryJobOrchestrator::default();
let completed = Arc::new(Mutex::new(false));
let completed_flag = completed.clone();
orchestrator.register_completion_callback(Arc::new(move |status| {
if status.state == JobState::Succeeded {
if let Ok(mut guard) = completed_flag.lock() {
*guard = true;
}
}
}));
let handle = orchestrator
.enqueue(JobRequest::new(
"refresh_dashboard",
json!({"account_id": 7}),
"refresh-7",
))
.unwrap();
orchestrator
.mark_succeeded(&handle.id, json!({"rows_synced": 18}))
.unwrap();
let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
assert_eq!(status.state, JobState::Succeeded);
assert_eq!(
status.result,
Some(json!({
"rows_synced": 18
}))
);
assert!(*completed.lock().unwrap());
}
#[test]
fn query_context_helpers_encode_trace_correlation_retry_timeout() {
let context = QueryContext::default()
.with_correlation_id("corr-42")
.with_request_id("req-42")
.with_timeout_ms(900)
.with_retry_policy(RetryPolicy {
max_attempts: 4,
initial_backoff_ms: 25,
max_backoff_ms: 100,
});
assert_eq!(
context
.tags
.get(CONTEXT_TAG_CORRELATION_ID)
.map(String::as_str),
Some("corr-42")
);
assert_eq!(
context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
Some("req-42")
);
assert_eq!(
context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
Some("900")
);
assert_eq!(context.correlation_id(), Some("corr-42"));
assert_eq!(context.request_id(), Some("req-42"));
assert_eq!(context.timeout_ms(), Some(900));
assert_eq!(
context.retry_policy_override(),
Some(RetryPolicy {
max_attempts: 4,
initial_backoff_ms: 25,
max_backoff_ms: 100
})
);
}
#[test]
fn run_with_contract_respects_context_retry_override() {
let context = QueryContext::default().with_retry_policy(RetryPolicy {
max_attempts: 2,
initial_backoff_ms: 0,
max_backoff_ms: 0,
});
let mut attempts = 0u32;
let result = run_with_contract(
"opensearch",
"search",
AdapterCallContract::default()
.with_retry_policy(RetryPolicy::never())
.with_timeout_ms(1_000),
&context,
|attempt| {
attempts = attempt;
if attempt == 1 {
Err(IntegrationError::new(
"opensearch",
IntegrationErrorKind::Transient,
"temporary failure",
))
} else {
Ok("ok")
}
},
)
.unwrap();
assert_eq!(result, "ok");
assert_eq!(attempts, 2);
}
#[test]
fn run_with_contract_returns_timeout_error() {
let context = QueryContext::default().with_timeout_ms(5);
let err = run_with_contract(
"singlestore",
"run_query",
AdapterCallContract::default(),
&context,
|_| {
std::thread::sleep(std::time::Duration::from_millis(15));
Ok::<_, IntegrationError>("done")
},
)
.unwrap_err();
assert_eq!(err.kind, IntegrationErrorKind::Timeout);
assert_eq!(err.code.as_deref(), Some("operation_timeout"));
}
#[test]
fn axiom_bridge_enriches_events_with_trace_and_correlation() {
let sink = Arc::new(InMemoryAxiomSink::default());
let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
let context = QueryContext {
tenant_id: Some("tenant-a".to_string()),
trace_id: Some("trace-123".to_string()),
tags: std::collections::BTreeMap::new(),
}
.with_correlation_id("corr-9")
.with_request_id("req-9");
bridge
.emit("session_event", json!({"event":"patch"}), &context)
.unwrap();
let events = sink.events();
assert_eq!(events.len(), 1);
let payload = events[0].payload.as_object().unwrap();
assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
}
#[test]
fn conformance_suite_passes_with_reference_adapters() {
let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
[("tenant".to_string(), Value::String("north".to_string()))],
)]);
let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
id: 1,
data: std::collections::BTreeMap::from([(
"title".to_string(),
Value::String("Acme renewal".to_string()),
)]),
}]);
let trigger = InMemoryJobOrchestrator::default();
let analytics = InMemoryAxiomSink::default();
let report = run_adapter_conformance_suite(
&singlestore,
&opensearch,
&trigger,
&analytics,
&QueryContext::default(),
);
assert!(report.passed(), "report={report:?}");
assert!(report.checks.len() >= 4);
}
}