Skip to main content

shelly_data/
integrations.rs

1use crate::{
2    error::{DataError, DataResult},
3    query::{Query, WireFormatProfile},
4    repo::{CompactRowsPayload, Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9    collections::{BTreeMap, HashMap},
10    fmt,
11    future::Future,
12    sync::{Arc, Mutex},
13    thread,
14    time::{Duration, Instant},
15};
16use tokio::time::sleep;
17use tracing::{info, warn};
18
19pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
20pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
21pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
22pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
23pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
24pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum IntegrationErrorKind {
29    Transient,
30    Permanent,
31    Auth,
32    RateLimited,
33    Timeout,
34    Unavailable,
35    InvalidInput,
36}
37
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39pub struct IntegrationError {
40    pub source: String,
41    pub kind: IntegrationErrorKind,
42    pub message: String,
43    pub code: Option<String>,
44    pub retryable: bool,
45}
46
47impl IntegrationError {
48    pub fn new(
49        source: impl Into<String>,
50        kind: IntegrationErrorKind,
51        message: impl Into<String>,
52    ) -> Self {
53        let kind_value = kind;
54        Self {
55            source: source.into(),
56            kind: kind_value,
57            message: message.into(),
58            code: None,
59            retryable: matches!(
60                kind_value,
61                IntegrationErrorKind::Transient
62                    | IntegrationErrorKind::RateLimited
63                    | IntegrationErrorKind::Timeout
64                    | IntegrationErrorKind::Unavailable
65            ),
66        }
67    }
68
69    pub fn with_code(mut self, code: impl Into<String>) -> Self {
70        self.code = Some(code.into());
71        self
72    }
73}
74
75impl fmt::Display for IntegrationError {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        if let Some(code) = &self.code {
78            write!(f, "[{}:{}] {}", self.source, code, self.message)
79        } else {
80            write!(f, "[{}] {}", self.source, self.message)
81        }
82    }
83}
84
85impl std::error::Error for IntegrationError {}
86
87pub type IntegrationResult<T> = Result<T, IntegrationError>;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90pub struct RetryPolicy {
91    pub max_attempts: u32,
92    pub initial_backoff_ms: u64,
93    pub max_backoff_ms: u64,
94}
95
96impl RetryPolicy {
97    pub fn conservative() -> Self {
98        Self {
99            max_attempts: 3,
100            initial_backoff_ms: 50,
101            max_backoff_ms: 500,
102        }
103    }
104
105    pub fn never() -> Self {
106        Self {
107            max_attempts: 1,
108            initial_backoff_ms: 0,
109            max_backoff_ms: 0,
110        }
111    }
112}
113
114impl Default for RetryPolicy {
115    fn default() -> Self {
116        Self::conservative()
117    }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121pub struct AdapterCallContract {
122    pub retry_policy: RetryPolicy,
123    pub timeout_ms: u64,
124}
125
126impl AdapterCallContract {
127    pub fn low_latency() -> Self {
128        Self {
129            retry_policy: RetryPolicy::never(),
130            timeout_ms: 250,
131        }
132    }
133
134    pub fn default_query() -> Self {
135        Self {
136            retry_policy: RetryPolicy::conservative(),
137            timeout_ms: 1_500,
138        }
139    }
140
141    pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
142        self.retry_policy = retry_policy;
143        self
144    }
145
146    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
147        self.timeout_ms = timeout_ms.max(1);
148        self
149    }
150}
151
152impl Default for AdapterCallContract {
153    fn default() -> Self {
154        Self::default_query()
155    }
156}
157
158pub fn run_with_contract<T, F>(
159    source: &str,
160    operation: &str,
161    contract: AdapterCallContract,
162    context: &QueryContext,
163    mut operation_fn: F,
164) -> IntegrationResult<T>
165where
166    F: FnMut(u32) -> IntegrationResult<T>,
167{
168    let policy = context
169        .retry_policy_override()
170        .unwrap_or(contract.retry_policy);
171    let attempts = policy.max_attempts.max(1);
172    let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
173    let mut backoff_ms = policy.initial_backoff_ms;
174
175    for attempt in 1..=attempts {
176        let started_at = Instant::now();
177        let result = operation_fn(attempt);
178        let elapsed_ms = started_at.elapsed().as_millis() as u64;
179        if elapsed_ms > timeout_ms {
180            return Err(IntegrationError::new(
181                source,
182                IntegrationErrorKind::Timeout,
183                format!(
184                    "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
185                    elapsed_ms, timeout_ms
186                ),
187            )
188            .with_code("operation_timeout"));
189        }
190
191        match result {
192            Ok(value) => return Ok(value),
193            Err(err) if !err.retryable || attempt == attempts => return Err(err),
194            Err(err) => {
195                warn!(
196                    target: "shelly.integration.query",
197                    source,
198                    operation,
199                    attempt,
200                    max_attempts = attempts,
201                    tenant_id = ?context.tenant_id,
202                    trace_id = ?context.trace_id,
203                    correlation_id = context.correlation_id().unwrap_or("-"),
204                    request_id = context.request_id().unwrap_or("-"),
205                    timeout_ms,
206                    backoff_ms,
207                    error = %err,
208                    "Shelly integration retrying transient failure without blocking sleep; use run_with_contract_async for non-blocking backoff"
209                );
210                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
211            }
212        }
213    }
214
215    Err(IntegrationError::new(
216        source,
217        IntegrationErrorKind::Transient,
218        format!("{operation} retry exhausted"),
219    )
220    .with_code("retry_exhausted"))
221}
222
223pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
224where
225    F: FnMut(u32) -> IntegrationResult<T>,
226{
227    let attempts = policy.max_attempts.max(1);
228    let mut backoff_ms = policy.initial_backoff_ms;
229
230    for attempt in 1..=attempts {
231        match operation(attempt) {
232            Ok(value) => return Ok(value),
233            Err(err) if !err.retryable || attempt == attempts => return Err(err),
234            Err(_) => {
235                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
236            }
237        }
238    }
239
240    Err(IntegrationError::new(
241        "retry",
242        IntegrationErrorKind::Transient,
243        "retry exhausted",
244    ))
245}
246
247pub async fn run_with_contract_async<T, F, Fut>(
248    source: &str,
249    operation: &str,
250    contract: AdapterCallContract,
251    context: &QueryContext,
252    mut operation_fn: F,
253) -> IntegrationResult<T>
254where
255    F: FnMut(u32) -> Fut,
256    Fut: Future<Output = IntegrationResult<T>>,
257{
258    let policy = context
259        .retry_policy_override()
260        .unwrap_or(contract.retry_policy);
261    let attempts = policy.max_attempts.max(1);
262    let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
263    let mut backoff_ms = policy.initial_backoff_ms;
264
265    for attempt in 1..=attempts {
266        let started_at = Instant::now();
267        let result = operation_fn(attempt).await;
268        let elapsed_ms = started_at.elapsed().as_millis() as u64;
269        if elapsed_ms > timeout_ms {
270            return Err(IntegrationError::new(
271                source,
272                IntegrationErrorKind::Timeout,
273                format!(
274                    "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
275                    elapsed_ms, timeout_ms
276                ),
277            )
278            .with_code("operation_timeout"));
279        }
280
281        match result {
282            Ok(value) => return Ok(value),
283            Err(err) if !err.retryable || attempt == attempts => return Err(err),
284            Err(err) => {
285                warn!(
286                    target: "shelly.integration.query",
287                    source,
288                    operation,
289                    attempt,
290                    max_attempts = attempts,
291                    tenant_id = ?context.tenant_id,
292                    trace_id = ?context.trace_id,
293                    correlation_id = context.correlation_id().unwrap_or("-"),
294                    request_id = context.request_id().unwrap_or("-"),
295                    timeout_ms,
296                    backoff_ms,
297                    error = %err,
298                    "Shelly integration retrying transient failure with non-blocking backoff"
299                );
300                if backoff_ms > 0 {
301                    sleep(Duration::from_millis(backoff_ms)).await;
302                }
303                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
304            }
305        }
306    }
307
308    Err(IntegrationError::new(
309        source,
310        IntegrationErrorKind::Transient,
311        format!("{operation} retry exhausted"),
312    )
313    .with_code("retry_exhausted"))
314}
315
316pub async fn run_with_retry_async<T, F, Fut>(
317    policy: RetryPolicy,
318    mut operation: F,
319) -> IntegrationResult<T>
320where
321    F: FnMut(u32) -> Fut,
322    Fut: Future<Output = IntegrationResult<T>>,
323{
324    let attempts = policy.max_attempts.max(1);
325    let mut backoff_ms = policy.initial_backoff_ms;
326
327    for attempt in 1..=attempts {
328        match operation(attempt).await {
329            Ok(value) => return Ok(value),
330            Err(err) if !err.retryable || attempt == attempts => return Err(err),
331            Err(_) => {
332                if backoff_ms > 0 {
333                    sleep(Duration::from_millis(backoff_ms)).await;
334                }
335                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
336            }
337        }
338    }
339
340    Err(IntegrationError::new(
341        "retry",
342        IntegrationErrorKind::Transient,
343        "retry exhausted",
344    ))
345}
346
347pub trait ConnectionLifecycleHook: Send + Sync {
348    fn on_connect(&self) -> IntegrationResult<()> {
349        Ok(())
350    }
351
352    fn on_disconnect(&self) -> IntegrationResult<()> {
353        Ok(())
354    }
355}
356
357pub trait ConnectionLifecycle {
358    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
359    fn connect(&self) -> IntegrationResult<()>;
360    fn disconnect(&self) -> IntegrationResult<()>;
361}
362
363#[derive(Default)]
364pub struct LifecycleHooks {
365    hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
366}
367
368impl LifecycleHooks {
369    pub fn new() -> Self {
370        Self::default()
371    }
372}
373
374impl ConnectionLifecycle for LifecycleHooks {
375    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
376        self.hooks.push(hook);
377    }
378
379    fn connect(&self) -> IntegrationResult<()> {
380        for hook in &self.hooks {
381            hook.on_connect()?;
382        }
383        Ok(())
384    }
385
386    fn disconnect(&self) -> IntegrationResult<()> {
387        for hook in &self.hooks {
388            hook.on_disconnect()?;
389        }
390        Ok(())
391    }
392}
393
394#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
395pub struct QueryContext {
396    pub tenant_id: Option<String>,
397    pub trace_id: Option<String>,
398    pub tags: BTreeMap<String, String>,
399}
400
401impl QueryContext {
402    pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
403        let tenant_id = tenant_id.into();
404        let tenant_id = tenant_id.trim();
405        if tenant_id.is_empty() {
406            self.tenant_id = None;
407        } else {
408            self.tenant_id = Some(tenant_id.to_string());
409        }
410        self
411    }
412
413    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
414        self.tags.insert(key.into(), value.into());
415        self
416    }
417
418    pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
419        self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
420    }
421
422    pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
423        self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
424    }
425
426    pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
427        self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
428    }
429
430    pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
431        self.with_tag(
432            CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
433            policy.max_attempts.to_string(),
434        )
435        .with_tag(
436            CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
437            policy.initial_backoff_ms.to_string(),
438        )
439        .with_tag(
440            CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
441            policy.max_backoff_ms.to_string(),
442        )
443    }
444
445    pub fn correlation_id(&self) -> Option<&str> {
446        self.tags
447            .get(CONTEXT_TAG_CORRELATION_ID)
448            .map(String::as_str)
449            .map(str::trim)
450            .filter(|value| !value.is_empty())
451    }
452
453    pub fn request_id(&self) -> Option<&str> {
454        self.tags
455            .get(CONTEXT_TAG_REQUEST_ID)
456            .map(String::as_str)
457            .map(str::trim)
458            .filter(|value| !value.is_empty())
459    }
460
461    pub fn timeout_ms(&self) -> Option<u64> {
462        self.tags
463            .get(CONTEXT_TAG_TIMEOUT_MS)
464            .and_then(|value| value.parse::<u64>().ok())
465            .filter(|value| *value > 0)
466    }
467
468    pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
469        let max_attempts = self
470            .tags
471            .get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
472            .and_then(|value| value.parse::<u32>().ok())?;
473        let initial_backoff_ms = self
474            .tags
475            .get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
476            .and_then(|value| value.parse::<u64>().ok())
477            .unwrap_or(0);
478        let max_backoff_ms = self
479            .tags
480            .get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
481            .and_then(|value| value.parse::<u64>().ok())
482            .unwrap_or(initial_backoff_ms);
483        Some(RetryPolicy {
484            max_attempts: max_attempts.max(1),
485            initial_backoff_ms,
486            max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
487        })
488    }
489}
490
491pub trait TypedQueryBoundary {
492    type Request: Send + Sync;
493    type Response: Send + Sync;
494
495    fn execute(
496        &self,
497        request: &Self::Request,
498        context: &QueryContext,
499    ) -> IntegrationResult<Self::Response>;
500}
501
502#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
503pub struct SqlCommand {
504    pub statement: String,
505    pub params: Vec<Value>,
506}
507
508impl SqlCommand {
509    pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
510        Self {
511            statement: statement.into(),
512            params,
513        }
514    }
515}
516
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub struct DataWindowRequest {
519    pub dataset: String,
520    pub offset: usize,
521    pub limit: usize,
522    #[serde(default, skip_serializing_if = "Option::is_none")]
523    pub query_token: Option<String>,
524    #[serde(default, skip_serializing_if = "Option::is_none")]
525    pub window_token: Option<String>,
526    #[serde(default)]
527    pub wire_format: WireFormatProfile,
528}
529
530impl DataWindowRequest {
531    pub fn new(dataset: impl Into<String>, offset: usize, limit: usize) -> Self {
532        Self {
533            dataset: dataset.into(),
534            offset,
535            limit: limit.max(1),
536            query_token: None,
537            window_token: None,
538            wire_format: WireFormatProfile::Json,
539        }
540    }
541
542    pub fn with_query_token(mut self, query_token: impl Into<String>) -> Self {
543        self.query_token = Some(query_token.into());
544        self
545    }
546
547    pub fn with_window_token(mut self, window_token: impl Into<String>) -> Self {
548        self.window_token = Some(window_token.into());
549        self
550    }
551
552    pub fn compact(mut self) -> Self {
553        self.wire_format = WireFormatProfile::Compact;
554        self
555    }
556}
557
558#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
559pub struct DataWindowResponse {
560    pub dataset: String,
561    pub offset: usize,
562    pub limit: usize,
563    pub total_rows: usize,
564    pub query_token: String,
565    pub window_token: String,
566    pub rows: Vec<Row>,
567    #[serde(default, skip_serializing_if = "Option::is_none")]
568    pub compact_rows: Option<CompactRowsPayload>,
569}
570
571pub trait SingleStoreAdapter:
572    TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
573{
574    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
575        let started_at = Instant::now();
576        let statement = query.statement.as_str();
577        let param_count = query.params.len();
578        let result = self.execute(&query, context);
579        match &result {
580            Ok(rows) => info!(
581                target: "shelly.integration.query",
582                source = "singlestore",
583                operation = "run_query",
584                tenant_id = ?context.tenant_id,
585                trace_id = ?context.trace_id,
586                tag_count = context.tags.len(),
587                statement,
588                param_count,
589                row_count = rows.len(),
590                duration_ms = started_at.elapsed().as_millis() as u64,
591                "Shelly integration query executed"
592            ),
593            Err(err) => warn!(
594                target: "shelly.integration.query",
595                source = "singlestore",
596                operation = "run_query",
597                tenant_id = ?context.tenant_id,
598                trace_id = ?context.trace_id,
599                tag_count = context.tags.len(),
600                statement,
601                param_count,
602                duration_ms = started_at.elapsed().as_millis() as u64,
603                error = %err,
604                "Shelly integration query failed"
605            ),
606        }
607        result
608    }
609
610    fn run_high_volume_window_query(
611        &self,
612        query: SqlCommand,
613        request: DataWindowRequest,
614        context: &QueryContext,
615    ) -> IntegrationResult<DataWindowResponse> {
616        let rows = self.run_query(query, context)?;
617        Ok(materialize_window_response(
618            "singlestore",
619            request,
620            rows,
621            None,
622        ))
623    }
624}
625
626pub trait ClickHouseAdapter:
627    TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
628{
629    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
630        let started_at = Instant::now();
631        let statement = query.statement.as_str();
632        let param_count = query.params.len();
633        let result = self.execute(&query, context);
634        match &result {
635            Ok(rows) => info!(
636                target: "shelly.integration.query",
637                source = "clickhouse",
638                operation = "run_query",
639                tenant_id = ?context.tenant_id,
640                trace_id = ?context.trace_id,
641                correlation_id = context.correlation_id().unwrap_or("-"),
642                request_id = context.request_id().unwrap_or("-"),
643                tag_count = context.tags.len(),
644                statement,
645                param_count,
646                row_count = rows.len(),
647                duration_ms = started_at.elapsed().as_millis() as u64,
648                "Shelly integration query executed"
649            ),
650            Err(err) => warn!(
651                target: "shelly.integration.query",
652                source = "clickhouse",
653                operation = "run_query",
654                tenant_id = ?context.tenant_id,
655                trace_id = ?context.trace_id,
656                correlation_id = context.correlation_id().unwrap_or("-"),
657                request_id = context.request_id().unwrap_or("-"),
658                tag_count = context.tags.len(),
659                statement,
660                param_count,
661                duration_ms = started_at.elapsed().as_millis() as u64,
662                error = %err,
663                "Shelly integration query failed"
664            ),
665        }
666        result
667    }
668
669    fn run_high_volume_window_query(
670        &self,
671        query: SqlCommand,
672        request: DataWindowRequest,
673        context: &QueryContext,
674    ) -> IntegrationResult<DataWindowResponse> {
675        let rows = self.run_query(query, context)?;
676        Ok(materialize_window_response(
677            "clickhouse",
678            request,
679            rows,
680            None,
681        ))
682    }
683}
684
685pub trait BigQueryAdapter:
686    TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
687{
688    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
689        let started_at = Instant::now();
690        let statement = query.statement.as_str();
691        let param_count = query.params.len();
692        let result = self.execute(&query, context);
693        match &result {
694            Ok(rows) => info!(
695                target: "shelly.integration.query",
696                source = "bigquery",
697                operation = "run_query",
698                tenant_id = ?context.tenant_id,
699                trace_id = ?context.trace_id,
700                correlation_id = context.correlation_id().unwrap_or("-"),
701                request_id = context.request_id().unwrap_or("-"),
702                tag_count = context.tags.len(),
703                statement,
704                param_count,
705                row_count = rows.len(),
706                duration_ms = started_at.elapsed().as_millis() as u64,
707                "Shelly integration query executed"
708            ),
709            Err(err) => warn!(
710                target: "shelly.integration.query",
711                source = "bigquery",
712                operation = "run_query",
713                tenant_id = ?context.tenant_id,
714                trace_id = ?context.trace_id,
715                correlation_id = context.correlation_id().unwrap_or("-"),
716                request_id = context.request_id().unwrap_or("-"),
717                tag_count = context.tags.len(),
718                statement,
719                param_count,
720                duration_ms = started_at.elapsed().as_millis() as u64,
721                error = %err,
722                "Shelly integration query failed"
723            ),
724        }
725        result
726    }
727
728    fn run_high_volume_window_query(
729        &self,
730        query: SqlCommand,
731        request: DataWindowRequest,
732        context: &QueryContext,
733    ) -> IntegrationResult<DataWindowResponse> {
734        let rows = self.run_query(query, context)?;
735        Ok(materialize_window_response("bigquery", request, rows, None))
736    }
737}
738
739#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
740pub struct SearchRequest {
741    pub index: String,
742    pub text: String,
743    pub filters: Vec<crate::query::Filter>,
744    pub page: usize,
745    pub per_page: usize,
746}
747
748impl SearchRequest {
749    pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
750        Self {
751            index: index.into(),
752            text: text.into(),
753            filters: Vec::new(),
754            page: 1,
755            per_page: 25,
756        }
757    }
758
759    pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
760        self.filters.push(filter);
761        self
762    }
763}
764
765#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
766pub struct SearchResponse {
767    pub total_hits: usize,
768    pub rows: Vec<StoredRow>,
769}
770
771pub trait OpenSearchAdapter:
772    TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse> + Send + Sync
773{
774    fn search(
775        &self,
776        request: SearchRequest,
777        context: &QueryContext,
778    ) -> IntegrationResult<SearchResponse> {
779        let started_at = Instant::now();
780        let index = request.index.as_str();
781        let text = request.text.as_str();
782        let filter_count = request.filters.len();
783        let page = request.page;
784        let per_page = request.per_page;
785        let result = self.execute(&request, context);
786        match &result {
787            Ok(response) => info!(
788                target: "shelly.integration.query",
789                source = "opensearch",
790                operation = "search",
791                tenant_id = ?context.tenant_id,
792                trace_id = ?context.trace_id,
793                tag_count = context.tags.len(),
794                index,
795                text,
796                filter_count,
797                page,
798                per_page,
799                row_count = response.rows.len(),
800                total_hits = response.total_hits,
801                duration_ms = started_at.elapsed().as_millis() as u64,
802                "Shelly integration query executed"
803            ),
804            Err(err) => warn!(
805                target: "shelly.integration.query",
806                source = "opensearch",
807                operation = "search",
808                tenant_id = ?context.tenant_id,
809                trace_id = ?context.trace_id,
810                tag_count = context.tags.len(),
811                index,
812                text,
813                filter_count,
814                page,
815                per_page,
816                duration_ms = started_at.elapsed().as_millis() as u64,
817                error = %err,
818                "Shelly integration query failed"
819            ),
820        }
821        result
822    }
823
824    fn search_window(
825        &self,
826        mut request: SearchRequest,
827        window: DataWindowRequest,
828        context: &QueryContext,
829    ) -> IntegrationResult<DataWindowResponse> {
830        let limit = window.limit.max(1);
831        request.page = (window.offset / limit).saturating_add(1);
832        request.per_page = limit;
833        let response = self.search(request, context)?;
834        let rows = response
835            .rows
836            .into_iter()
837            .map(|row| row.data)
838            .collect::<Vec<_>>();
839        let query_token = window
840            .query_token
841            .unwrap_or_else(|| format!("opensearch:{}:{}", window.dataset, response.total_hits));
842        let window_token = window.window_token.unwrap_or_else(|| {
843            format!(
844                "{query_token}:offset={}:limit={limit}:rows={}",
845                window.offset,
846                rows.len()
847            )
848        });
849        let compact_rows =
850            (window.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
851        Ok(DataWindowResponse {
852            dataset: window.dataset,
853            offset: window.offset,
854            limit,
855            total_rows: response.total_hits,
856            query_token,
857            window_token,
858            rows,
859            compact_rows,
860        })
861    }
862}
863
864pub trait AnalyticsSink: Send + Sync {
865    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
866}
867
868#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
869pub struct AnalyticsEvent {
870    pub namespace: String,
871    pub name: String,
872    pub payload: Value,
873}
874
875impl AnalyticsEvent {
876    pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
877        Self {
878            namespace: namespace.into(),
879            name: name.into(),
880            payload,
881        }
882    }
883}
884
885#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
886pub struct JobRequest {
887    pub workflow: String,
888    pub payload: Value,
889    pub idempotency_key: String,
890    pub metadata: BTreeMap<String, String>,
891}
892
893impl JobRequest {
894    pub fn new(
895        workflow: impl Into<String>,
896        payload: Value,
897        idempotency_key: impl Into<String>,
898    ) -> Self {
899        Self {
900            workflow: workflow.into(),
901            payload,
902            idempotency_key: idempotency_key.into(),
903            metadata: BTreeMap::new(),
904        }
905    }
906}
907
908#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
909#[serde(rename_all = "snake_case")]
910pub enum JobState {
911    Queued,
912    Running,
913    Succeeded,
914    Failed,
915    Canceled,
916}
917
918#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
919pub struct JobStatus {
920    pub id: String,
921    pub state: JobState,
922    pub attempts: u32,
923    pub result: Option<Value>,
924    pub error: Option<IntegrationError>,
925}
926
927#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
928pub struct JobHandle {
929    pub id: String,
930    pub workflow: String,
931    pub idempotency_key: String,
932}
933
934pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
935
936pub trait JobOrchestrator: Send + Sync {
937    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
938    fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
939    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
940    fn register_completion_callback(&self, callback: JobCompletionCallback);
941}
942
943pub trait TriggerDevAdapter: Send + Sync {
944    fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
945    fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
946    fn poll_workflow(
947        &self,
948        id: &str,
949        attempts: u32,
950        backoff_ms: u64,
951    ) -> IntegrationResult<JobStatus>;
952}
953
954impl<T> TriggerDevAdapter for T
955where
956    T: JobOrchestrator + Send + Sync,
957{
958    fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
959        self.enqueue(request)
960    }
961
962    fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
963        self.status(id)
964    }
965
966    fn poll_workflow(
967        &self,
968        id: &str,
969        attempts: u32,
970        backoff_ms: u64,
971    ) -> IntegrationResult<JobStatus> {
972        self.poll(id, attempts, backoff_ms)
973    }
974}
975
976#[derive(Clone)]
977pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
978    sink: Arc<S>,
979    namespace: String,
980}
981
982impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
983    pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
984        Self {
985            sink,
986            namespace: namespace.into(),
987        }
988    }
989
990    pub fn emit(
991        &self,
992        name: impl Into<String>,
993        payload: Value,
994        context: &QueryContext,
995    ) -> IntegrationResult<()> {
996        let mut envelope = serde_json::Map::new();
997        envelope.insert("payload".to_string(), payload);
998        envelope.insert(
999            "tenant_id".to_string(),
1000            context
1001                .tenant_id
1002                .as_ref()
1003                .map(|value| Value::String(value.clone()))
1004                .unwrap_or(Value::Null),
1005        );
1006        envelope.insert(
1007            "trace_id".to_string(),
1008            context
1009                .trace_id
1010                .as_ref()
1011                .map(|value| Value::String(value.clone()))
1012                .unwrap_or(Value::Null),
1013        );
1014        envelope.insert(
1015            "correlation_id".to_string(),
1016            context
1017                .correlation_id()
1018                .map(|value| Value::String(value.to_string()))
1019                .unwrap_or(Value::Null),
1020        );
1021        envelope.insert(
1022            "request_id".to_string(),
1023            context
1024                .request_id()
1025                .map(|value| Value::String(value.to_string()))
1026                .unwrap_or(Value::Null),
1027        );
1028        envelope.insert("tags".to_string(), serde_json::json!(context.tags));
1029
1030        self.sink.send_event(AnalyticsEvent::new(
1031            self.namespace.clone(),
1032            name.into(),
1033            Value::Object(envelope),
1034        ))
1035    }
1036}
1037
1038#[derive(Debug, Clone, Default)]
1039pub struct InMemorySingleStoreAdapter {
1040    rows: Vec<Row>,
1041}
1042
1043impl InMemorySingleStoreAdapter {
1044    pub fn new(rows: Vec<Row>) -> Self {
1045        Self { rows }
1046    }
1047}
1048
1049impl TypedQueryBoundary for InMemorySingleStoreAdapter {
1050    type Request = SqlCommand;
1051    type Response = Vec<Row>;
1052
1053    fn execute(
1054        &self,
1055        request: &Self::Request,
1056        _context: &QueryContext,
1057    ) -> IntegrationResult<Self::Response> {
1058        if request.statement.trim().is_empty() {
1059            return Err(IntegrationError::new(
1060                "singlestore",
1061                IntegrationErrorKind::InvalidInput,
1062                "empty SQL statement",
1063            )
1064            .with_code("empty_statement"));
1065        }
1066        Ok(self.rows.clone())
1067    }
1068}
1069
1070impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
1071
1072#[derive(Debug, Clone, Default)]
1073pub struct InMemoryClickHouseAdapter {
1074    rows: Vec<Row>,
1075}
1076
1077impl InMemoryClickHouseAdapter {
1078    pub fn new(rows: Vec<Row>) -> Self {
1079        Self { rows }
1080    }
1081}
1082
1083impl TypedQueryBoundary for InMemoryClickHouseAdapter {
1084    type Request = SqlCommand;
1085    type Response = Vec<Row>;
1086
1087    fn execute(
1088        &self,
1089        request: &Self::Request,
1090        _context: &QueryContext,
1091    ) -> IntegrationResult<Self::Response> {
1092        if request.statement.trim().is_empty() {
1093            return Err(IntegrationError::new(
1094                "clickhouse",
1095                IntegrationErrorKind::InvalidInput,
1096                "empty SQL statement",
1097            )
1098            .with_code("empty_statement"));
1099        }
1100        Ok(self.rows.clone())
1101    }
1102}
1103
1104impl ClickHouseAdapter for InMemoryClickHouseAdapter {}
1105
1106#[derive(Debug, Clone, Default)]
1107pub struct InMemoryBigQueryAdapter {
1108    rows: Vec<Row>,
1109}
1110
1111impl InMemoryBigQueryAdapter {
1112    pub fn new(rows: Vec<Row>) -> Self {
1113        Self { rows }
1114    }
1115}
1116
1117impl TypedQueryBoundary for InMemoryBigQueryAdapter {
1118    type Request = SqlCommand;
1119    type Response = Vec<Row>;
1120
1121    fn execute(
1122        &self,
1123        request: &Self::Request,
1124        _context: &QueryContext,
1125    ) -> IntegrationResult<Self::Response> {
1126        if request.statement.trim().is_empty() {
1127            return Err(IntegrationError::new(
1128                "bigquery",
1129                IntegrationErrorKind::InvalidInput,
1130                "empty SQL statement",
1131            )
1132            .with_code("empty_statement"));
1133        }
1134        Ok(self.rows.clone())
1135    }
1136}
1137
1138impl BigQueryAdapter for InMemoryBigQueryAdapter {}
1139
1140#[derive(Debug, Clone, Default)]
1141pub struct InMemoryOpenSearchAdapter {
1142    rows: Vec<StoredRow>,
1143}
1144
1145impl InMemoryOpenSearchAdapter {
1146    pub fn new(rows: Vec<StoredRow>) -> Self {
1147        Self { rows }
1148    }
1149}
1150
1151impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
1152    type Request = SearchRequest;
1153    type Response = SearchResponse;
1154
1155    fn execute(
1156        &self,
1157        request: &Self::Request,
1158        _context: &QueryContext,
1159    ) -> IntegrationResult<Self::Response> {
1160        if request.index.trim().is_empty() {
1161            return Err(IntegrationError::new(
1162                "opensearch",
1163                IntegrationErrorKind::InvalidInput,
1164                "search index must not be empty",
1165            )
1166            .with_code("empty_index"));
1167        }
1168        let needle = request.text.trim().to_lowercase();
1169        let mut filtered = self
1170            .rows
1171            .iter()
1172            .filter(|row| {
1173                if needle.is_empty() {
1174                    return true;
1175                }
1176                row.data.values().any(|value| {
1177                    value
1178                        .as_str()
1179                        .map(|text| text.to_lowercase().contains(&needle))
1180                        .unwrap_or(false)
1181                })
1182            })
1183            .cloned()
1184            .collect::<Vec<_>>();
1185        let total_hits = filtered.len();
1186        let page = request.page.max(1);
1187        let per_page = request.per_page.max(1);
1188        let offset = (page - 1) * per_page;
1189        filtered = filtered.into_iter().skip(offset).take(per_page).collect();
1190        Ok(SearchResponse {
1191            total_hits,
1192            rows: filtered,
1193        })
1194    }
1195}
1196
1197impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
1198
1199#[derive(Debug, Default, Clone)]
1200pub struct InMemoryAxiomSink {
1201    events: Arc<Mutex<Vec<AnalyticsEvent>>>,
1202}
1203
1204impl InMemoryAxiomSink {
1205    pub fn events(&self) -> Vec<AnalyticsEvent> {
1206        self.events
1207            .lock()
1208            .map(|events| events.clone())
1209            .unwrap_or_default()
1210    }
1211}
1212
1213impl AnalyticsSink for InMemoryAxiomSink {
1214    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
1215        let started_at = Instant::now();
1216        let namespace = event.namespace.clone();
1217        let name = event.name.clone();
1218        self.events
1219            .lock()
1220            .map_err(|_| {
1221                IntegrationError::new(
1222                    "axiom",
1223                    IntegrationErrorKind::Unavailable,
1224                    "analytics sink lock poisoned",
1225                )
1226            })?
1227            .push(event);
1228        info!(
1229            target: "shelly.integration.query",
1230            source = "axiom",
1231            operation = "send_event",
1232            namespace,
1233            event_name = name,
1234            duration_ms = started_at.elapsed().as_millis() as u64,
1235            "Shelly integration call executed"
1236        );
1237        Ok(())
1238    }
1239}
1240
1241#[derive(Default)]
1242pub struct InMemoryJobOrchestrator {
1243    statuses: Mutex<HashMap<String, JobStatus>>,
1244    callbacks: Mutex<Vec<JobCompletionCallback>>,
1245    next_id: Mutex<u64>,
1246}
1247
1248impl InMemoryJobOrchestrator {
1249    pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
1250        let status = self.with_status_mut(id, |status| {
1251            status.state = JobState::Succeeded;
1252            status.result = Some(result);
1253            status.error = None;
1254        })?;
1255        self.notify(&status);
1256        Ok(())
1257    }
1258
1259    pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
1260        let status = self.with_status_mut(id, |status| {
1261            status.state = JobState::Failed;
1262            status.result = None;
1263            status.error = Some(error);
1264        })?;
1265        self.notify(&status);
1266        Ok(())
1267    }
1268
1269    fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
1270    where
1271        F: FnOnce(&mut JobStatus),
1272    {
1273        let mut statuses = self.statuses.lock().map_err(|_| {
1274            IntegrationError::new(
1275                "trigger",
1276                IntegrationErrorKind::Unavailable,
1277                "job status lock poisoned",
1278            )
1279        })?;
1280        let Some(status) = statuses.get_mut(id) else {
1281            return Err(IntegrationError::new(
1282                "trigger",
1283                IntegrationErrorKind::InvalidInput,
1284                "job not found",
1285            )
1286            .with_code("job_not_found"));
1287        };
1288        status.attempts = status.attempts.saturating_add(1);
1289        update(status);
1290        Ok(status.clone())
1291    }
1292
1293    fn notify(&self, status: &JobStatus) {
1294        if let Ok(callbacks) = self.callbacks.lock() {
1295            for callback in callbacks.iter() {
1296                callback(status);
1297            }
1298        }
1299    }
1300
1301    fn is_terminal(state: JobState) -> bool {
1302        matches!(
1303            state,
1304            JobState::Succeeded | JobState::Failed | JobState::Canceled
1305        )
1306    }
1307}
1308
1309impl JobOrchestrator for InMemoryJobOrchestrator {
1310    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
1311        let started_at = Instant::now();
1312        let workflow = request.workflow.clone();
1313        let idempotency_key = request.idempotency_key.clone();
1314        let mut next_id = self.next_id.lock().map_err(|_| {
1315            IntegrationError::new(
1316                "trigger",
1317                IntegrationErrorKind::Unavailable,
1318                "job id lock poisoned",
1319            )
1320        })?;
1321        *next_id = next_id.saturating_add(1);
1322        let id = format!("job-{next_id}");
1323        let status = JobStatus {
1324            id: id.clone(),
1325            state: JobState::Queued,
1326            attempts: 0,
1327            result: None,
1328            error: None,
1329        };
1330        self.statuses
1331            .lock()
1332            .map_err(|_| {
1333                IntegrationError::new(
1334                    "trigger",
1335                    IntegrationErrorKind::Unavailable,
1336                    "job status lock poisoned",
1337                )
1338            })?
1339            .insert(id.clone(), status);
1340        let handle = JobHandle {
1341            id,
1342            workflow: request.workflow,
1343            idempotency_key: request.idempotency_key,
1344        };
1345        info!(
1346            target: "shelly.integration.query",
1347            source = "trigger",
1348            operation = "enqueue",
1349            workflow,
1350            idempotency_key,
1351            job_id = handle.id.as_str(),
1352            duration_ms = started_at.elapsed().as_millis() as u64,
1353            "Shelly integration call executed"
1354        );
1355        Ok(handle)
1356    }
1357
1358    fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
1359        let started_at = Instant::now();
1360        let result = self
1361            .statuses
1362            .lock()
1363            .map_err(|_| {
1364                IntegrationError::new(
1365                    "trigger",
1366                    IntegrationErrorKind::Unavailable,
1367                    "job status lock poisoned",
1368                )
1369            })?
1370            .get(id)
1371            .cloned()
1372            .ok_or_else(|| {
1373                IntegrationError::new(
1374                    "trigger",
1375                    IntegrationErrorKind::InvalidInput,
1376                    "job not found",
1377                )
1378                .with_code("job_not_found")
1379            });
1380        match &result {
1381            Ok(status) => info!(
1382                target: "shelly.integration.query",
1383                source = "trigger",
1384                operation = "status",
1385                job_id = id,
1386                job_state = ?status.state,
1387                attempts = status.attempts,
1388                duration_ms = started_at.elapsed().as_millis() as u64,
1389                "Shelly integration call executed"
1390            ),
1391            Err(err) => warn!(
1392                target: "shelly.integration.query",
1393                source = "trigger",
1394                operation = "status",
1395                job_id = id,
1396                duration_ms = started_at.elapsed().as_millis() as u64,
1397                error = %err,
1398                "Shelly integration call failed"
1399            ),
1400        }
1401        result
1402    }
1403
1404    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
1405        let started_at = Instant::now();
1406        let attempts = attempts.max(1);
1407        let mut polled = 0u32;
1408        for current in 1..=attempts {
1409            let status = self.status(id)?;
1410            polled = current;
1411            if Self::is_terminal(status.state) || current == attempts {
1412                info!(
1413                    target: "shelly.integration.query",
1414                    source = "trigger",
1415                    operation = "poll",
1416                    job_id = id,
1417                    attempts = polled,
1418                    backoff_ms,
1419                    terminal = Self::is_terminal(status.state),
1420                    job_state = ?status.state,
1421                    duration_ms = started_at.elapsed().as_millis() as u64,
1422                    "Shelly integration call executed"
1423                );
1424                return Ok(status);
1425            }
1426            if backoff_ms > 0 {
1427                thread::sleep(Duration::from_millis(backoff_ms));
1428            }
1429        }
1430
1431        let result = self.status(id);
1432        match &result {
1433            Ok(status) => info!(
1434                target: "shelly.integration.query",
1435                source = "trigger",
1436                operation = "poll",
1437                job_id = id,
1438                attempts = polled,
1439                backoff_ms,
1440                terminal = Self::is_terminal(status.state),
1441                job_state = ?status.state,
1442                duration_ms = started_at.elapsed().as_millis() as u64,
1443                "Shelly integration call executed"
1444            ),
1445            Err(err) => warn!(
1446                target: "shelly.integration.query",
1447                source = "trigger",
1448                operation = "poll",
1449                job_id = id,
1450                attempts = polled,
1451                backoff_ms,
1452                duration_ms = started_at.elapsed().as_millis() as u64,
1453                error = %err,
1454                "Shelly integration call failed"
1455            ),
1456        }
1457        result
1458    }
1459
1460    fn register_completion_callback(&self, callback: JobCompletionCallback) {
1461        if let Ok(mut callbacks) = self.callbacks.lock() {
1462            callbacks.push(callback);
1463            info!(
1464                target: "shelly.integration.query",
1465                source = "trigger",
1466                operation = "register_completion_callback",
1467                callback_count = callbacks.len(),
1468                "Shelly integration callback registered"
1469            );
1470        } else {
1471            warn!(
1472                target: "shelly.integration.query",
1473                source = "trigger",
1474                operation = "register_completion_callback",
1475                "Shelly integration callback registration failed"
1476            );
1477        }
1478    }
1479}
1480
1481#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1482pub struct AdapterConformanceCheck {
1483    pub name: String,
1484    pub passed: bool,
1485    pub detail: String,
1486}
1487
1488#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
1489pub struct AdapterConformanceReport {
1490    pub checks: Vec<AdapterConformanceCheck>,
1491}
1492
1493impl AdapterConformanceReport {
1494    pub fn passed(&self) -> bool {
1495        self.checks.iter().all(|check| check.passed)
1496    }
1497
1498    pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1499        self.checks.push(AdapterConformanceCheck {
1500            name: name.into(),
1501            passed: true,
1502            detail: detail.into(),
1503        });
1504    }
1505
1506    pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1507        self.checks.push(AdapterConformanceCheck {
1508            name: name.into(),
1509            passed: false,
1510            detail: detail.into(),
1511        });
1512    }
1513}
1514
1515pub fn run_adapter_conformance_suite(
1516    singlestore: &dyn SingleStoreAdapter,
1517    clickhouse: &dyn ClickHouseAdapter,
1518    bigquery: &dyn BigQueryAdapter,
1519    opensearch: &dyn OpenSearchAdapter,
1520    trigger: &dyn TriggerDevAdapter,
1521    analytics: &dyn AnalyticsSink,
1522    context: &QueryContext,
1523) -> AdapterConformanceReport {
1524    let mut report = AdapterConformanceReport::default();
1525
1526    match singlestore.run_query(
1527        SqlCommand::new(
1528            "SELECT tenant, active_accounts FROM tenant_rollup",
1529            Vec::new(),
1530        ),
1531        context,
1532    ) {
1533        Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
1534        Err(err) => report.add_fail("singlestore.query", err.to_string()),
1535    }
1536
1537    match clickhouse.run_query(
1538        SqlCommand::new("SELECT account, events FROM account_events", Vec::new()),
1539        context,
1540    ) {
1541        Ok(rows) => report.add_pass("clickhouse.query", format!("rows={}", rows.len())),
1542        Err(err) => report.add_fail("clickhouse.query", err.to_string()),
1543    }
1544
1545    match bigquery.run_query(
1546        SqlCommand::new("SELECT account, arr FROM analytics_rollup", Vec::new()),
1547        context,
1548    ) {
1549        Ok(rows) => report.add_pass("bigquery.query", format!("rows={}", rows.len())),
1550        Err(err) => report.add_fail("bigquery.query", err.to_string()),
1551    }
1552
1553    match opensearch.search(SearchRequest::new("accounts", ""), context) {
1554        Ok(response) => report.add_pass(
1555            "opensearch.search",
1556            format!(
1557                "rows={} total_hits={}",
1558                response.rows.len(),
1559                response.total_hits
1560            ),
1561        ),
1562        Err(err) => report.add_fail("opensearch.search", err.to_string()),
1563    }
1564
1565    match trigger.trigger_workflow(JobRequest::new(
1566        "adapter_conformance_probe",
1567        serde_json::json!({"probe": true}),
1568        "adapter-conformance-probe",
1569    )) {
1570        Ok(handle) => {
1571            report.add_pass("trigger.enqueue", format!("id={}", handle.id));
1572            match trigger.workflow_status(&handle.id) {
1573                Ok(status) => report.add_pass(
1574                    "trigger.status",
1575                    format!("state={:?} attempts={}", status.state, status.attempts),
1576                ),
1577                Err(err) => report.add_fail("trigger.status", err.to_string()),
1578            }
1579        }
1580        Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
1581    }
1582
1583    let analytics_event = AnalyticsEvent::new(
1584        "adapter_conformance",
1585        "probe",
1586        serde_json::json!({
1587            "trace_id": context.trace_id,
1588            "correlation_id": context.correlation_id(),
1589            "request_id": context.request_id(),
1590        }),
1591    );
1592    match analytics.send_event(analytics_event) {
1593        Ok(()) => report.add_pass("analytics.emit", "event accepted"),
1594        Err(err) => report.add_fail("analytics.emit", err.to_string()),
1595    }
1596
1597    let context_correlation_check =
1598        context.correlation_id().is_some() && context.request_id().is_some();
1599    if context_correlation_check {
1600        report.add_pass(
1601            "context.correlation",
1602            "correlation_id and request_id present",
1603        );
1604    } else {
1605        report.add_fail(
1606            "context.correlation",
1607            "missing correlation_id/request_id; set QueryContext::with_correlation_id + with_request_id",
1608        );
1609    }
1610
1611    let retry_override_context = context.clone().with_retry_policy(RetryPolicy {
1612        max_attempts: 2,
1613        initial_backoff_ms: 0,
1614        max_backoff_ms: 0,
1615    });
1616    let mut retry_attempts = 0u32;
1617    match run_with_contract(
1618        "conformance",
1619        "retry_override_probe",
1620        AdapterCallContract::default_query(),
1621        &retry_override_context,
1622        |attempt| {
1623            retry_attempts = attempt;
1624            if attempt == 1 {
1625                Err(IntegrationError::new(
1626                    "conformance",
1627                    IntegrationErrorKind::Transient,
1628                    "transient probe failure",
1629                ))
1630            } else {
1631                Ok("ok")
1632            }
1633        },
1634    ) {
1635        Ok(_) if retry_attempts == 2 => report.add_pass(
1636            "contract.retry_override",
1637            "retry override honored with bounded attempts",
1638        ),
1639        Ok(_) => report.add_fail(
1640            "contract.retry_override",
1641            format!("unexpected attempts={retry_attempts} expected=2"),
1642        ),
1643        Err(err) => report.add_fail("contract.retry_override", err.to_string()),
1644    }
1645
1646    let timeout_context = context.clone().with_timeout_ms(1);
1647    match run_with_contract(
1648        "conformance",
1649        "timeout_probe",
1650        AdapterCallContract::default_query(),
1651        &timeout_context,
1652        |_| {
1653            thread::sleep(Duration::from_millis(5));
1654            Ok::<_, IntegrationError>("done")
1655        },
1656    ) {
1657        Err(err) if err.kind == IntegrationErrorKind::Timeout => {
1658            report.add_pass("contract.timeout", "timeout classification validated")
1659        }
1660        Err(err) => report.add_fail("contract.timeout", err.to_string()),
1661        Ok(_) => report.add_fail(
1662            "contract.timeout",
1663            "expected timeout but operation returned success",
1664        ),
1665    }
1666
1667    report
1668}
1669
1670pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
1671    DataError::Integration(format!("[{}] {}", source.into(), err))
1672}
1673
1674pub fn map_integration_result<T>(
1675    source: impl Into<String>,
1676    result: IntegrationResult<T>,
1677) -> DataResult<T> {
1678    result.map_err(|err| map_integration_error(source, err))
1679}
1680
1681pub fn query_from_search(request: &SearchRequest) -> Query {
1682    let mut query = Query::new().paginate(request.page, request.per_page);
1683    for filter in &request.filters {
1684        query = query.where_filter(filter.clone());
1685    }
1686    query
1687}
1688
1689fn materialize_window_response(
1690    source: &str,
1691    request: DataWindowRequest,
1692    rows: Vec<Row>,
1693    total_rows_hint: Option<usize>,
1694) -> DataWindowResponse {
1695    let total_rows = total_rows_hint.unwrap_or(rows.len());
1696    let offset = request.offset.min(total_rows);
1697    let limit = request.limit.max(1);
1698    let window_rows = rows
1699        .into_iter()
1700        .skip(offset)
1701        .take(limit)
1702        .collect::<Vec<_>>();
1703    let query_token = request
1704        .query_token
1705        .unwrap_or_else(|| format!("{source}:{}:{}", request.dataset, total_rows));
1706    let window_token = request.window_token.unwrap_or_else(|| {
1707        format!(
1708            "{query_token}:offset={offset}:limit={limit}:rows={}",
1709            window_rows.len()
1710        )
1711    });
1712    let compact_rows = (request.wire_format == WireFormatProfile::Compact)
1713        .then(|| encode_compact_rows(&window_rows));
1714
1715    DataWindowResponse {
1716        dataset: request.dataset,
1717        offset,
1718        limit,
1719        total_rows,
1720        query_token,
1721        window_token,
1722        rows: window_rows,
1723        compact_rows,
1724    }
1725}
1726
1727fn encode_compact_rows(rows: &[Row]) -> CompactRowsPayload {
1728    let mut columns: Vec<String> = Vec::new();
1729    for row in rows {
1730        for key in row.keys() {
1731            if !columns.iter().any(|column| column == key) {
1732                columns.push(key.clone());
1733            }
1734        }
1735    }
1736
1737    let encoded_rows = rows
1738        .iter()
1739        .map(|row| {
1740            columns
1741                .iter()
1742                .map(|column| row.get(column).cloned().unwrap_or(Value::Null))
1743                .collect::<Vec<_>>()
1744        })
1745        .collect::<Vec<_>>();
1746
1747    CompactRowsPayload {
1748        columns,
1749        rows: encoded_rows,
1750    }
1751}
1752
1753#[cfg(test)]
1754mod tests {
1755    use super::{
1756        map_integration_result, run_adapter_conformance_suite, run_with_contract,
1757        run_with_contract_async, run_with_retry, run_with_retry_async, AdapterCallContract,
1758        AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge, BigQueryAdapter, ClickHouseAdapter,
1759        ConnectionLifecycle, ConnectionLifecycleHook, DataWindowRequest, InMemoryAxiomSink,
1760        InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryJobOrchestrator,
1761        InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
1762        IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
1763        LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
1764        SearchResponse, SingleStoreAdapter, SqlCommand, TriggerDevAdapter, TypedQueryBoundary,
1765        CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID, CONTEXT_TAG_TIMEOUT_MS,
1766    };
1767    use serde_json::{json, Value};
1768    use std::sync::{Arc, Mutex};
1769
1770    struct CountingHook {
1771        connects: Arc<Mutex<u32>>,
1772        disconnects: Arc<Mutex<u32>>,
1773    }
1774
1775    impl ConnectionLifecycleHook for CountingHook {
1776        fn on_connect(&self) -> super::IntegrationResult<()> {
1777            let mut guard = self.connects.lock().unwrap();
1778            *guard += 1;
1779            Ok(())
1780        }
1781
1782        fn on_disconnect(&self) -> super::IntegrationResult<()> {
1783            let mut guard = self.disconnects.lock().unwrap();
1784            *guard += 1;
1785            Ok(())
1786        }
1787    }
1788
1789    #[test]
1790    fn lifecycle_hooks_are_called() {
1791        let connects = Arc::new(Mutex::new(0));
1792        let disconnects = Arc::new(Mutex::new(0));
1793        let mut lifecycle = LifecycleHooks::new();
1794        lifecycle.register_hook(Arc::new(CountingHook {
1795            connects: connects.clone(),
1796            disconnects: disconnects.clone(),
1797        }));
1798
1799        lifecycle.connect().unwrap();
1800        lifecycle.disconnect().unwrap();
1801        assert_eq!(*connects.lock().unwrap(), 1);
1802        assert_eq!(*disconnects.lock().unwrap(), 1);
1803    }
1804
1805    #[test]
1806    fn retry_policy_retries_transient_errors() {
1807        let mut calls = 0u32;
1808        let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
1809            calls = attempt;
1810            if attempt < 3 {
1811                Err(IntegrationError::new(
1812                    "opensearch",
1813                    IntegrationErrorKind::Transient,
1814                    "temporary failure",
1815                ))
1816            } else {
1817                Ok("ok")
1818            }
1819        })
1820        .unwrap();
1821
1822        assert_eq!(result, "ok");
1823        assert_eq!(calls, 3);
1824    }
1825
1826    #[test]
1827    fn retry_policy_stops_on_permanent_errors() {
1828        let mut calls = 0u32;
1829        let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
1830            calls = attempt;
1831            Err::<(), IntegrationError>(IntegrationError::new(
1832                "singlestore",
1833                IntegrationErrorKind::Permanent,
1834                "invalid sql",
1835            ))
1836        })
1837        .unwrap_err();
1838
1839        assert_eq!(calls, 1);
1840        assert_eq!(err.kind, IntegrationErrorKind::Permanent);
1841    }
1842
1843    #[test]
1844    fn integration_result_maps_into_data_error() {
1845        let mapped = map_integration_result::<()>(
1846            "trigger",
1847            Err(IntegrationError::new(
1848                "trigger",
1849                IntegrationErrorKind::Unavailable,
1850                "service unavailable",
1851            )),
1852        )
1853        .unwrap_err();
1854
1855        assert!(mapped.to_string().contains("service unavailable"));
1856    }
1857
1858    #[derive(Default)]
1859    struct InMemoryJobs {
1860        statuses: Arc<Mutex<Vec<JobStatus>>>,
1861        callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
1862    }
1863
1864    impl JobOrchestrator for InMemoryJobs {
1865        fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
1866            let id = format!("job-{}", request.idempotency_key);
1867            self.statuses.lock().unwrap().push(JobStatus {
1868                id: id.clone(),
1869                state: JobState::Queued,
1870                attempts: 0,
1871                result: None,
1872                error: None,
1873            });
1874            Ok(JobHandle {
1875                id,
1876                workflow: request.workflow,
1877                idempotency_key: request.idempotency_key,
1878            })
1879        }
1880
1881        fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
1882            self.statuses
1883                .lock()
1884                .unwrap()
1885                .iter()
1886                .find(|status| status.id == id)
1887                .cloned()
1888                .ok_or_else(|| {
1889                    IntegrationError::new(
1890                        "trigger",
1891                        IntegrationErrorKind::InvalidInput,
1892                        "job not found",
1893                    )
1894                })
1895        }
1896
1897        fn poll(
1898            &self,
1899            id: &str,
1900            _attempts: u32,
1901            _backoff_ms: u64,
1902        ) -> super::IntegrationResult<JobStatus> {
1903            self.status(id)
1904        }
1905
1906        fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
1907            self.callbacks.lock().unwrap().push(callback);
1908        }
1909    }
1910
1911    #[test]
1912    fn job_orchestration_contract_supports_enqueue_and_status() {
1913        let jobs = InMemoryJobs::default();
1914        let handle = jobs
1915            .enqueue(JobRequest::new(
1916                "sync_customer",
1917                json!({"id": 42}),
1918                "idempotent-42",
1919            ))
1920            .unwrap();
1921        let status = jobs.status(&handle.id).unwrap();
1922        assert_eq!(status.state, JobState::Queued);
1923        assert_eq!(handle.idempotency_key, "idempotent-42");
1924
1925        let ctx = QueryContext::default().with_tenant_id("tenant-a");
1926        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
1927    }
1928
1929    #[test]
1930    fn reference_singlestore_adapter_runs_typed_sql_boundary() {
1931        let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
1932            "region".to_string(),
1933            Value::String("EMEA".to_string()),
1934        )])]);
1935        let rows = adapter
1936            .run_query(
1937                SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1938                &QueryContext::default(),
1939            )
1940            .unwrap();
1941        assert_eq!(rows.len(), 1);
1942        assert_eq!(
1943            rows[0].get("region"),
1944            Some(&Value::String("EMEA".to_string()))
1945        );
1946    }
1947
1948    #[test]
1949    fn reference_clickhouse_adapter_runs_typed_sql_boundary() {
1950        let adapter = InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
1951            "events".to_string(),
1952            Value::Number(128.into()),
1953        )])]);
1954        let rows = adapter
1955            .run_query(
1956                SqlCommand::new("SELECT events FROM account_events", Vec::new()),
1957                &QueryContext::default(),
1958            )
1959            .unwrap();
1960        assert_eq!(rows.len(), 1);
1961        assert_eq!(rows[0].get("events"), Some(&Value::Number(128.into())));
1962    }
1963
1964    #[test]
1965    fn reference_bigquery_adapter_runs_typed_sql_boundary() {
1966        let adapter = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
1967            "active_accounts".to_string(),
1968            Value::Number(64.into()),
1969        )])]);
1970        let rows = adapter
1971            .run_query(
1972                SqlCommand::new("SELECT active_accounts FROM analytics_rollup", Vec::new()),
1973                &QueryContext::default(),
1974            )
1975            .unwrap();
1976        assert_eq!(rows.len(), 1);
1977        assert_eq!(
1978            rows[0].get("active_accounts"),
1979            Some(&Value::Number(64.into()))
1980        );
1981    }
1982
1983    #[test]
1984    fn reference_opensearch_adapter_filters_rows() {
1985        let rows = vec![
1986            crate::StoredRow {
1987                id: 1,
1988                data: std::collections::BTreeMap::from([(
1989                    "title".to_string(),
1990                    Value::String("Acme renewal".to_string()),
1991                )]),
1992            },
1993            crate::StoredRow {
1994                id: 2,
1995                data: std::collections::BTreeMap::from([(
1996                    "title".to_string(),
1997                    Value::String("Globex onboarding".to_string()),
1998                )]),
1999            },
2000        ];
2001        let adapter = InMemoryOpenSearchAdapter::new(rows);
2002        let response = adapter
2003            .search(
2004                SearchRequest::new("accounts", "renewal"),
2005                &QueryContext::default(),
2006            )
2007            .unwrap();
2008        assert_eq!(response.total_hits, 1);
2009        assert_eq!(response.rows[0].id, 1);
2010    }
2011
2012    #[test]
2013    fn high_volume_window_query_profiles_support_compact_payloads() {
2014        let sql_rows = vec![
2015            std::collections::BTreeMap::from([
2016                ("tenant".to_string(), Value::String("north".to_string())),
2017                ("arr".to_string(), Value::Number(120.into())),
2018            ]),
2019            std::collections::BTreeMap::from([
2020                ("tenant".to_string(), Value::String("south".to_string())),
2021                ("arr".to_string(), Value::Number(210.into())),
2022            ]),
2023            std::collections::BTreeMap::from([
2024                ("tenant".to_string(), Value::String("west".to_string())),
2025                ("arr".to_string(), Value::Number(330.into())),
2026            ]),
2027            std::collections::BTreeMap::from([
2028                ("tenant".to_string(), Value::String("east".to_string())),
2029                ("arr".to_string(), Value::Number(480.into())),
2030            ]),
2031        ];
2032        let context = QueryContext::default();
2033        let request = DataWindowRequest::new("tenant_rollup", 1, 2)
2034            .with_query_token("q:m51")
2035            .compact();
2036
2037        let singlestore = InMemorySingleStoreAdapter::new(sql_rows.clone());
2038        let clickhouse = InMemoryClickHouseAdapter::new(sql_rows.clone());
2039        let bigquery = InMemoryBigQueryAdapter::new(sql_rows.clone());
2040
2041        for response in [
2042            singlestore
2043                .run_high_volume_window_query(
2044                    SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2045                    request.clone(),
2046                    &context,
2047                )
2048                .expect("singlestore window query"),
2049            clickhouse
2050                .run_high_volume_window_query(
2051                    SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2052                    request.clone(),
2053                    &context,
2054                )
2055                .expect("clickhouse window query"),
2056            bigquery
2057                .run_high_volume_window_query(
2058                    SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2059                    request.clone(),
2060                    &context,
2061                )
2062                .expect("bigquery window query"),
2063        ] {
2064            assert_eq!(response.dataset, "tenant_rollup");
2065            assert_eq!(response.rows.len(), 2);
2066            assert_eq!(response.total_rows, 4);
2067            assert_eq!(response.query_token, "q:m51");
2068            let compact = response.compact_rows.expect("compact payload");
2069            assert!(compact.columns.contains(&"tenant".to_string()));
2070            assert!(compact.columns.contains(&"arr".to_string()));
2071            assert_eq!(compact.rows.len(), response.rows.len());
2072        }
2073    }
2074
2075    #[test]
2076    fn opensearch_window_search_uses_window_contract() {
2077        let rows = vec![
2078            crate::StoredRow {
2079                id: 1,
2080                data: std::collections::BTreeMap::from([
2081                    ("tenant".to_string(), Value::String("acme".to_string())),
2082                    ("status".to_string(), Value::String("healthy".to_string())),
2083                ]),
2084            },
2085            crate::StoredRow {
2086                id: 2,
2087                data: std::collections::BTreeMap::from([
2088                    ("tenant".to_string(), Value::String("globex".to_string())),
2089                    ("status".to_string(), Value::String("renewal".to_string())),
2090                ]),
2091            },
2092            crate::StoredRow {
2093                id: 3,
2094                data: std::collections::BTreeMap::from([
2095                    ("tenant".to_string(), Value::String("initech".to_string())),
2096                    ("status".to_string(), Value::String("at-risk".to_string())),
2097                ]),
2098            },
2099        ];
2100        let adapter = InMemoryOpenSearchAdapter::new(rows);
2101        let response = adapter
2102            .search_window(
2103                SearchRequest::new("accounts", ""),
2104                DataWindowRequest::new("accounts", 1, 2).compact(),
2105                &QueryContext::default(),
2106            )
2107            .expect("opensearch window search");
2108
2109        assert_eq!(response.dataset, "accounts");
2110        assert_eq!(response.offset, 1);
2111        assert_eq!(response.limit, 2);
2112        assert_eq!(response.total_rows, 3);
2113        assert_eq!(response.rows.len(), 2);
2114        assert!(response.compact_rows.is_some());
2115        assert!(!response.window_token.is_empty());
2116    }
2117
2118    #[test]
2119    fn reference_axiom_sink_records_events() {
2120        let sink = InMemoryAxiomSink::default();
2121        sink.send_event(AnalyticsEvent::new(
2122            "sales",
2123            "query_executed",
2124            json!({"latency_ms": 12}),
2125        ))
2126        .unwrap();
2127        let events = sink.events();
2128        assert_eq!(events.len(), 1);
2129        assert_eq!(events[0].name, "query_executed");
2130    }
2131
2132    #[test]
2133    fn reference_trigger_orchestrator_supports_completion_and_polling() {
2134        let orchestrator = InMemoryJobOrchestrator::default();
2135        let completed = Arc::new(Mutex::new(false));
2136        let completed_flag = completed.clone();
2137        orchestrator.register_completion_callback(Arc::new(move |status| {
2138            if status.state == JobState::Succeeded {
2139                if let Ok(mut guard) = completed_flag.lock() {
2140                    *guard = true;
2141                }
2142            }
2143        }));
2144
2145        let handle = orchestrator
2146            .enqueue(JobRequest::new(
2147                "refresh_dashboard",
2148                json!({"account_id": 7}),
2149                "refresh-7",
2150            ))
2151            .unwrap();
2152        orchestrator
2153            .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
2154            .unwrap();
2155        let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
2156        assert_eq!(status.state, JobState::Succeeded);
2157        assert_eq!(
2158            status.result,
2159            Some(json!({
2160                "rows_synced": 18
2161            }))
2162        );
2163        assert!(*completed.lock().unwrap());
2164    }
2165
2166    #[test]
2167    fn query_context_helpers_encode_trace_correlation_retry_timeout() {
2168        let context = QueryContext::default()
2169            .with_correlation_id("corr-42")
2170            .with_request_id("req-42")
2171            .with_timeout_ms(900)
2172            .with_retry_policy(RetryPolicy {
2173                max_attempts: 4,
2174                initial_backoff_ms: 25,
2175                max_backoff_ms: 100,
2176            });
2177
2178        assert_eq!(
2179            context
2180                .tags
2181                .get(CONTEXT_TAG_CORRELATION_ID)
2182                .map(String::as_str),
2183            Some("corr-42")
2184        );
2185        assert_eq!(
2186            context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
2187            Some("req-42")
2188        );
2189        assert_eq!(
2190            context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
2191            Some("900")
2192        );
2193        assert_eq!(context.correlation_id(), Some("corr-42"));
2194        assert_eq!(context.request_id(), Some("req-42"));
2195        assert_eq!(context.timeout_ms(), Some(900));
2196        assert_eq!(
2197            context.retry_policy_override(),
2198            Some(RetryPolicy {
2199                max_attempts: 4,
2200                initial_backoff_ms: 25,
2201                max_backoff_ms: 100
2202            })
2203        );
2204    }
2205
2206    #[test]
2207    fn run_with_contract_respects_context_retry_override() {
2208        let context = QueryContext::default().with_retry_policy(RetryPolicy {
2209            max_attempts: 2,
2210            initial_backoff_ms: 0,
2211            max_backoff_ms: 0,
2212        });
2213        let mut attempts = 0u32;
2214        let result = run_with_contract(
2215            "opensearch",
2216            "search",
2217            AdapterCallContract::default()
2218                .with_retry_policy(RetryPolicy::never())
2219                .with_timeout_ms(1_000),
2220            &context,
2221            |attempt| {
2222                attempts = attempt;
2223                if attempt == 1 {
2224                    Err(IntegrationError::new(
2225                        "opensearch",
2226                        IntegrationErrorKind::Transient,
2227                        "temporary failure",
2228                    ))
2229                } else {
2230                    Ok("ok")
2231                }
2232            },
2233        )
2234        .unwrap();
2235        assert_eq!(result, "ok");
2236        assert_eq!(attempts, 2);
2237    }
2238
2239    #[tokio::test]
2240    async fn run_with_contract_async_respects_context_retry_override() {
2241        let context = QueryContext::default().with_retry_policy(RetryPolicy {
2242            max_attempts: 2,
2243            initial_backoff_ms: 0,
2244            max_backoff_ms: 0,
2245        });
2246        let attempts = Arc::new(Mutex::new(0u32));
2247        let attempts_for_closure = attempts.clone();
2248        let result = run_with_contract_async(
2249            "opensearch",
2250            "search_async",
2251            AdapterCallContract::default()
2252                .with_retry_policy(RetryPolicy::never())
2253                .with_timeout_ms(1_000),
2254            &context,
2255            move |attempt| {
2256                let attempts_for_step = attempts_for_closure.clone();
2257                async move {
2258                    if let Ok(mut guard) = attempts_for_step.lock() {
2259                        *guard = attempt;
2260                    }
2261                    if attempt == 1 {
2262                        Err(IntegrationError::new(
2263                            "opensearch",
2264                            IntegrationErrorKind::Transient,
2265                            "temporary failure",
2266                        ))
2267                    } else {
2268                        Ok("ok")
2269                    }
2270                }
2271            },
2272        )
2273        .await
2274        .unwrap();
2275        assert_eq!(result, "ok");
2276        assert_eq!(*attempts.lock().unwrap(), 2);
2277    }
2278
2279    #[tokio::test]
2280    async fn run_with_retry_async_retries_transient_errors() {
2281        let calls = Arc::new(Mutex::new(0u32));
2282        let calls_for_closure = calls.clone();
2283        let result = run_with_retry_async(RetryPolicy::conservative(), move |attempt| {
2284            let calls_for_step = calls_for_closure.clone();
2285            async move {
2286                if let Ok(mut guard) = calls_for_step.lock() {
2287                    *guard = attempt;
2288                }
2289                if attempt < 3 {
2290                    Err(IntegrationError::new(
2291                        "bigquery",
2292                        IntegrationErrorKind::Transient,
2293                        "temporary failure",
2294                    ))
2295                } else {
2296                    Ok("ok")
2297                }
2298            }
2299        })
2300        .await
2301        .unwrap();
2302        assert_eq!(result, "ok");
2303        assert_eq!(*calls.lock().unwrap(), 3);
2304    }
2305
2306    #[test]
2307    fn run_with_contract_returns_timeout_error() {
2308        let context = QueryContext::default().with_timeout_ms(5);
2309        let err = run_with_contract(
2310            "singlestore",
2311            "run_query",
2312            AdapterCallContract::default(),
2313            &context,
2314            |_| {
2315                std::thread::sleep(std::time::Duration::from_millis(15));
2316                Ok::<_, IntegrationError>("done")
2317            },
2318        )
2319        .unwrap_err();
2320        assert_eq!(err.kind, IntegrationErrorKind::Timeout);
2321        assert_eq!(err.code.as_deref(), Some("operation_timeout"));
2322    }
2323
2324    #[test]
2325    fn axiom_bridge_enriches_events_with_trace_and_correlation() {
2326        let sink = Arc::new(InMemoryAxiomSink::default());
2327        let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
2328        let context = QueryContext {
2329            tenant_id: Some("tenant-a".to_string()),
2330            trace_id: Some("trace-123".to_string()),
2331            tags: std::collections::BTreeMap::new(),
2332        }
2333        .with_correlation_id("corr-9")
2334        .with_request_id("req-9");
2335        bridge
2336            .emit("session_event", json!({"event":"patch"}), &context)
2337            .unwrap();
2338        let events = sink.events();
2339        assert_eq!(events.len(), 1);
2340        let payload = events[0].payload.as_object().unwrap();
2341        assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
2342        assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
2343        assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
2344    }
2345
2346    #[test]
2347    fn conformance_suite_passes_with_reference_adapters() {
2348        let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
2349            [("tenant".to_string(), Value::String("north".to_string()))],
2350        )]);
2351        let clickhouse =
2352            InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
2353                "events".to_string(),
2354                Value::Number(99.into()),
2355            )])]);
2356        let bigquery = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
2357            "accounts".to_string(),
2358            Value::Number(33.into()),
2359        )])]);
2360        let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
2361            id: 1,
2362            data: std::collections::BTreeMap::from([(
2363                "title".to_string(),
2364                Value::String("Acme renewal".to_string()),
2365            )]),
2366        }]);
2367        let trigger = InMemoryJobOrchestrator::default();
2368        let analytics = InMemoryAxiomSink::default();
2369        let report = run_adapter_conformance_suite(
2370            &singlestore,
2371            &clickhouse,
2372            &bigquery,
2373            &opensearch,
2374            &trigger,
2375            &analytics,
2376            &QueryContext::default()
2377                .with_correlation_id("corr-1")
2378                .with_request_id("req-1"),
2379        );
2380        assert!(report.passed(), "report={report:?}");
2381        assert!(report.checks.len() >= 8);
2382    }
2383
2384    #[test]
2385    fn adapter_error_paths_cover_invalid_inputs_and_failure_reporting() {
2386        let context = QueryContext::default();
2387        let empty_sql = SqlCommand::new("   ", Vec::new());
2388        let singlestore = InMemorySingleStoreAdapter::default();
2389        let clickhouse = InMemoryClickHouseAdapter::default();
2390        let bigquery = InMemoryBigQueryAdapter::default();
2391        let opensearch = InMemoryOpenSearchAdapter::default();
2392
2393        for err in [
2394            singlestore
2395                .run_query(empty_sql.clone(), &context)
2396                .unwrap_err(),
2397            clickhouse
2398                .run_query(empty_sql.clone(), &context)
2399                .unwrap_err(),
2400            bigquery.run_query(empty_sql, &context).unwrap_err(),
2401        ] {
2402            assert_eq!(err.kind, IntegrationErrorKind::InvalidInput);
2403            assert_eq!(err.code.as_deref(), Some("empty_statement"));
2404        }
2405
2406        let search_err = opensearch
2407            .search(SearchRequest::new("   ", "needle"), &context)
2408            .unwrap_err();
2409        assert_eq!(search_err.kind, IntegrationErrorKind::InvalidInput);
2410        assert_eq!(search_err.code.as_deref(), Some("empty_index"));
2411    }
2412
2413    struct FailingSqlAdapter {
2414        source: &'static str,
2415    }
2416
2417    impl TypedQueryBoundary for FailingSqlAdapter {
2418        type Request = SqlCommand;
2419        type Response = Vec<crate::Row>;
2420
2421        fn execute(
2422            &self,
2423            _request: &Self::Request,
2424            _context: &QueryContext,
2425        ) -> super::IntegrationResult<Self::Response> {
2426            Err(IntegrationError::new(
2427                self.source,
2428                IntegrationErrorKind::Unavailable,
2429                "backend unavailable",
2430            )
2431            .with_code("backend_down"))
2432        }
2433    }
2434
2435    impl SingleStoreAdapter for FailingSqlAdapter {}
2436    impl ClickHouseAdapter for FailingSqlAdapter {}
2437    impl BigQueryAdapter for FailingSqlAdapter {}
2438
2439    struct FailingSearchAdapter;
2440
2441    impl TypedQueryBoundary for FailingSearchAdapter {
2442        type Request = SearchRequest;
2443        type Response = SearchResponse;
2444
2445        fn execute(
2446            &self,
2447            _request: &Self::Request,
2448            _context: &QueryContext,
2449        ) -> super::IntegrationResult<Self::Response> {
2450            Err(IntegrationError::new(
2451                "opensearch",
2452                IntegrationErrorKind::Unavailable,
2453                "search unavailable",
2454            ))
2455        }
2456    }
2457
2458    impl OpenSearchAdapter for FailingSearchAdapter {}
2459
2460    struct FailingTrigger;
2461
2462    impl TriggerDevAdapter for FailingTrigger {
2463        fn trigger_workflow(&self, _request: JobRequest) -> super::IntegrationResult<JobHandle> {
2464            Err(IntegrationError::new(
2465                "trigger",
2466                IntegrationErrorKind::Unavailable,
2467                "trigger unavailable",
2468            ))
2469        }
2470
2471        fn workflow_status(&self, _id: &str) -> super::IntegrationResult<JobStatus> {
2472            Err(IntegrationError::new(
2473                "trigger",
2474                IntegrationErrorKind::Unavailable,
2475                "trigger unavailable",
2476            ))
2477        }
2478
2479        fn poll_workflow(
2480            &self,
2481            _id: &str,
2482            _attempts: u32,
2483            _backoff_ms: u64,
2484        ) -> super::IntegrationResult<JobStatus> {
2485            Err(IntegrationError::new(
2486                "trigger",
2487                IntegrationErrorKind::Unavailable,
2488                "trigger unavailable",
2489            ))
2490        }
2491    }
2492
2493    #[derive(Default)]
2494    struct FailingAnalyticsSink;
2495
2496    impl AnalyticsSink for FailingAnalyticsSink {
2497        fn send_event(&self, _event: AnalyticsEvent) -> super::IntegrationResult<()> {
2498            Err(IntegrationError::new(
2499                "analytics",
2500                IntegrationErrorKind::Unavailable,
2501                "sink unavailable",
2502            ))
2503        }
2504    }
2505
2506    #[test]
2507    fn conformance_suite_captures_failures_for_unavailable_dependencies() {
2508        let failing_sql = FailingSqlAdapter { source: "sql" };
2509        let failing_search = FailingSearchAdapter;
2510        let failing_trigger = FailingTrigger;
2511        let failing_analytics = FailingAnalyticsSink;
2512
2513        let report = run_adapter_conformance_suite(
2514            &failing_sql,
2515            &failing_sql,
2516            &failing_sql,
2517            &failing_search,
2518            &failing_trigger,
2519            &failing_analytics,
2520            &QueryContext::default(),
2521        );
2522
2523        assert!(!report.passed());
2524        assert!(report
2525            .checks
2526            .iter()
2527            .any(|check| { check.name == "singlestore.query" && !check.passed }));
2528        assert!(report
2529            .checks
2530            .iter()
2531            .any(|check| { check.name == "opensearch.search" && !check.passed }));
2532        assert!(report
2533            .checks
2534            .iter()
2535            .any(|check| { check.name == "trigger.enqueue" && !check.passed }));
2536        assert!(report
2537            .checks
2538            .iter()
2539            .any(|check| { check.name == "analytics.emit" && !check.passed }));
2540        assert!(report
2541            .checks
2542            .iter()
2543            .any(|check| { check.name == "context.correlation" && !check.passed }));
2544    }
2545}