Skip to main content

shelly_data/
integrations.rs

1use crate::{
2    error::{DataError, DataResult},
3    query::{Query, WireFormatProfile},
4    repo::{CompactRowsPayload, Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9    collections::{BTreeMap, HashMap},
10    fmt,
11    future::Future,
12    sync::{Arc, Mutex},
13    thread,
14    time::{Duration, Instant},
15};
16use tokio::time::sleep;
17use tracing::{info, warn};
18
19pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
20pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
21pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
22pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
23pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
24pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum IntegrationErrorKind {
29    Transient,
30    Permanent,
31    Auth,
32    RateLimited,
33    Timeout,
34    Unavailable,
35    InvalidInput,
36}
37
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39pub struct IntegrationError {
40    pub source: String,
41    pub kind: IntegrationErrorKind,
42    pub message: String,
43    pub code: Option<String>,
44    pub retryable: bool,
45}
46
47impl IntegrationError {
48    pub fn new(
49        source: impl Into<String>,
50        kind: IntegrationErrorKind,
51        message: impl Into<String>,
52    ) -> Self {
53        let kind_value = kind;
54        Self {
55            source: source.into(),
56            kind: kind_value,
57            message: message.into(),
58            code: None,
59            retryable: matches!(
60                kind_value,
61                IntegrationErrorKind::Transient
62                    | IntegrationErrorKind::RateLimited
63                    | IntegrationErrorKind::Timeout
64                    | IntegrationErrorKind::Unavailable
65            ),
66        }
67    }
68
69    pub fn with_code(mut self, code: impl Into<String>) -> Self {
70        self.code = Some(code.into());
71        self
72    }
73}
74
75impl fmt::Display for IntegrationError {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        if let Some(code) = &self.code {
78            write!(f, "[{}:{}] {}", self.source, code, self.message)
79        } else {
80            write!(f, "[{}] {}", self.source, self.message)
81        }
82    }
83}
84
85impl std::error::Error for IntegrationError {}
86
87pub type IntegrationResult<T> = Result<T, IntegrationError>;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90pub struct RetryPolicy {
91    pub max_attempts: u32,
92    pub initial_backoff_ms: u64,
93    pub max_backoff_ms: u64,
94}
95
96impl RetryPolicy {
97    pub fn conservative() -> Self {
98        Self {
99            max_attempts: 3,
100            initial_backoff_ms: 50,
101            max_backoff_ms: 500,
102        }
103    }
104
105    pub fn never() -> Self {
106        Self {
107            max_attempts: 1,
108            initial_backoff_ms: 0,
109            max_backoff_ms: 0,
110        }
111    }
112}
113
114impl Default for RetryPolicy {
115    fn default() -> Self {
116        Self::conservative()
117    }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121pub struct AdapterCallContract {
122    pub retry_policy: RetryPolicy,
123    pub timeout_ms: u64,
124}
125
126impl AdapterCallContract {
127    pub fn low_latency() -> Self {
128        Self {
129            retry_policy: RetryPolicy::never(),
130            timeout_ms: 250,
131        }
132    }
133
134    pub fn default_query() -> Self {
135        Self {
136            retry_policy: RetryPolicy::conservative(),
137            timeout_ms: 1_500,
138        }
139    }
140
141    pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
142        self.retry_policy = retry_policy;
143        self
144    }
145
146    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
147        self.timeout_ms = timeout_ms.max(1);
148        self
149    }
150}
151
152impl Default for AdapterCallContract {
153    fn default() -> Self {
154        Self::default_query()
155    }
156}
157
158pub fn run_with_contract<T, F>(
159    source: &str,
160    operation: &str,
161    contract: AdapterCallContract,
162    context: &QueryContext,
163    mut operation_fn: F,
164) -> IntegrationResult<T>
165where
166    F: FnMut(u32) -> IntegrationResult<T>,
167{
168    let policy = context
169        .retry_policy_override()
170        .unwrap_or(contract.retry_policy);
171    let attempts = policy.max_attempts.max(1);
172    let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
173    let mut backoff_ms = policy.initial_backoff_ms;
174
175    for attempt in 1..=attempts {
176        let started_at = Instant::now();
177        let result = operation_fn(attempt);
178        let elapsed_ms = started_at.elapsed().as_millis() as u64;
179        if elapsed_ms > timeout_ms {
180            return Err(IntegrationError::new(
181                source,
182                IntegrationErrorKind::Timeout,
183                format!(
184                    "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
185                    elapsed_ms, timeout_ms
186                ),
187            )
188            .with_code("operation_timeout"));
189        }
190
191        match result {
192            Ok(value) => return Ok(value),
193            Err(err) if !err.retryable || attempt == attempts => return Err(err),
194            Err(err) => {
195                warn!(
196                    target: "shelly.integration.query",
197                    source,
198                    operation,
199                    attempt,
200                    max_attempts = attempts,
201                    tenant_id = ?context.tenant_id,
202                    trace_id = ?context.trace_id,
203                    correlation_id = context.correlation_id().unwrap_or("-"),
204                    request_id = context.request_id().unwrap_or("-"),
205                    timeout_ms,
206                    backoff_ms,
207                    error = %err,
208                    "Shelly integration retrying transient failure without blocking sleep; use run_with_contract_async for non-blocking backoff"
209                );
210                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
211            }
212        }
213    }
214
215    Err(IntegrationError::new(
216        source,
217        IntegrationErrorKind::Transient,
218        format!("{operation} retry exhausted"),
219    )
220    .with_code("retry_exhausted"))
221}
222
223pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
224where
225    F: FnMut(u32) -> IntegrationResult<T>,
226{
227    let attempts = policy.max_attempts.max(1);
228    let mut backoff_ms = policy.initial_backoff_ms;
229
230    for attempt in 1..=attempts {
231        match operation(attempt) {
232            Ok(value) => return Ok(value),
233            Err(err) if !err.retryable || attempt == attempts => return Err(err),
234            Err(_) => {
235                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
236            }
237        }
238    }
239
240    Err(IntegrationError::new(
241        "retry",
242        IntegrationErrorKind::Transient,
243        "retry exhausted",
244    ))
245}
246
247pub async fn run_with_contract_async<T, F, Fut>(
248    source: &str,
249    operation: &str,
250    contract: AdapterCallContract,
251    context: &QueryContext,
252    mut operation_fn: F,
253) -> IntegrationResult<T>
254where
255    F: FnMut(u32) -> Fut,
256    Fut: Future<Output = IntegrationResult<T>>,
257{
258    let policy = context
259        .retry_policy_override()
260        .unwrap_or(contract.retry_policy);
261    let attempts = policy.max_attempts.max(1);
262    let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
263    let mut backoff_ms = policy.initial_backoff_ms;
264
265    for attempt in 1..=attempts {
266        let started_at = Instant::now();
267        let result = operation_fn(attempt).await;
268        let elapsed_ms = started_at.elapsed().as_millis() as u64;
269        if elapsed_ms > timeout_ms {
270            return Err(IntegrationError::new(
271                source,
272                IntegrationErrorKind::Timeout,
273                format!(
274                    "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
275                    elapsed_ms, timeout_ms
276                ),
277            )
278            .with_code("operation_timeout"));
279        }
280
281        match result {
282            Ok(value) => return Ok(value),
283            Err(err) if !err.retryable || attempt == attempts => return Err(err),
284            Err(err) => {
285                warn!(
286                    target: "shelly.integration.query",
287                    source,
288                    operation,
289                    attempt,
290                    max_attempts = attempts,
291                    tenant_id = ?context.tenant_id,
292                    trace_id = ?context.trace_id,
293                    correlation_id = context.correlation_id().unwrap_or("-"),
294                    request_id = context.request_id().unwrap_or("-"),
295                    timeout_ms,
296                    backoff_ms,
297                    error = %err,
298                    "Shelly integration retrying transient failure with non-blocking backoff"
299                );
300                if backoff_ms > 0 {
301                    sleep(Duration::from_millis(backoff_ms)).await;
302                }
303                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
304            }
305        }
306    }
307
308    Err(IntegrationError::new(
309        source,
310        IntegrationErrorKind::Transient,
311        format!("{operation} retry exhausted"),
312    )
313    .with_code("retry_exhausted"))
314}
315
316pub async fn run_with_retry_async<T, F, Fut>(
317    policy: RetryPolicy,
318    mut operation: F,
319) -> IntegrationResult<T>
320where
321    F: FnMut(u32) -> Fut,
322    Fut: Future<Output = IntegrationResult<T>>,
323{
324    let attempts = policy.max_attempts.max(1);
325    let mut backoff_ms = policy.initial_backoff_ms;
326
327    for attempt in 1..=attempts {
328        match operation(attempt).await {
329            Ok(value) => return Ok(value),
330            Err(err) if !err.retryable || attempt == attempts => return Err(err),
331            Err(_) => {
332                if backoff_ms > 0 {
333                    sleep(Duration::from_millis(backoff_ms)).await;
334                }
335                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
336            }
337        }
338    }
339
340    Err(IntegrationError::new(
341        "retry",
342        IntegrationErrorKind::Transient,
343        "retry exhausted",
344    ))
345}
346
347pub trait ConnectionLifecycleHook: Send + Sync {
348    fn on_connect(&self) -> IntegrationResult<()> {
349        Ok(())
350    }
351
352    fn on_disconnect(&self) -> IntegrationResult<()> {
353        Ok(())
354    }
355}
356
357pub trait ConnectionLifecycle {
358    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
359    fn connect(&self) -> IntegrationResult<()>;
360    fn disconnect(&self) -> IntegrationResult<()>;
361}
362
363#[derive(Default)]
364pub struct LifecycleHooks {
365    hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
366}
367
368impl LifecycleHooks {
369    pub fn new() -> Self {
370        Self::default()
371    }
372}
373
374impl ConnectionLifecycle for LifecycleHooks {
375    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
376        self.hooks.push(hook);
377    }
378
379    fn connect(&self) -> IntegrationResult<()> {
380        for hook in &self.hooks {
381            hook.on_connect()?;
382        }
383        Ok(())
384    }
385
386    fn disconnect(&self) -> IntegrationResult<()> {
387        for hook in &self.hooks {
388            hook.on_disconnect()?;
389        }
390        Ok(())
391    }
392}
393
394#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
395pub struct QueryContext {
396    pub tenant_id: Option<String>,
397    pub trace_id: Option<String>,
398    pub tags: BTreeMap<String, String>,
399}
400
401impl QueryContext {
402    pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
403        let tenant_id = tenant_id.into();
404        let tenant_id = tenant_id.trim();
405        if tenant_id.is_empty() {
406            self.tenant_id = None;
407        } else {
408            self.tenant_id = Some(tenant_id.to_string());
409        }
410        self
411    }
412
413    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
414        self.tags.insert(key.into(), value.into());
415        self
416    }
417
418    pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
419        self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
420    }
421
422    pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
423        self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
424    }
425
426    pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
427        self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
428    }
429
430    pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
431        self.with_tag(
432            CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
433            policy.max_attempts.to_string(),
434        )
435        .with_tag(
436            CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
437            policy.initial_backoff_ms.to_string(),
438        )
439        .with_tag(
440            CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
441            policy.max_backoff_ms.to_string(),
442        )
443    }
444
445    pub fn correlation_id(&self) -> Option<&str> {
446        self.tags
447            .get(CONTEXT_TAG_CORRELATION_ID)
448            .map(String::as_str)
449            .map(str::trim)
450            .filter(|value| !value.is_empty())
451    }
452
453    pub fn request_id(&self) -> Option<&str> {
454        self.tags
455            .get(CONTEXT_TAG_REQUEST_ID)
456            .map(String::as_str)
457            .map(str::trim)
458            .filter(|value| !value.is_empty())
459    }
460
461    pub fn timeout_ms(&self) -> Option<u64> {
462        self.tags
463            .get(CONTEXT_TAG_TIMEOUT_MS)
464            .and_then(|value| value.parse::<u64>().ok())
465            .filter(|value| *value > 0)
466    }
467
468    pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
469        let max_attempts = self
470            .tags
471            .get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
472            .and_then(|value| value.parse::<u32>().ok())?;
473        let initial_backoff_ms = self
474            .tags
475            .get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
476            .and_then(|value| value.parse::<u64>().ok())
477            .unwrap_or(0);
478        let max_backoff_ms = self
479            .tags
480            .get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
481            .and_then(|value| value.parse::<u64>().ok())
482            .unwrap_or(initial_backoff_ms);
483        Some(RetryPolicy {
484            max_attempts: max_attempts.max(1),
485            initial_backoff_ms,
486            max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
487        })
488    }
489}
490
491pub trait TypedQueryBoundary {
492    type Request: Send + Sync;
493    type Response: Send + Sync;
494
495    fn execute(
496        &self,
497        request: &Self::Request,
498        context: &QueryContext,
499    ) -> IntegrationResult<Self::Response>;
500}
501
502#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
503pub struct SqlCommand {
504    pub statement: String,
505    pub params: Vec<Value>,
506}
507
508impl SqlCommand {
509    pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
510        Self {
511            statement: statement.into(),
512            params,
513        }
514    }
515}
516
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub struct DataWindowRequest {
519    pub dataset: String,
520    pub offset: usize,
521    pub limit: usize,
522    #[serde(default, skip_serializing_if = "Option::is_none")]
523    pub query_token: Option<String>,
524    #[serde(default, skip_serializing_if = "Option::is_none")]
525    pub window_token: Option<String>,
526    #[serde(default)]
527    pub wire_format: WireFormatProfile,
528}
529
530impl DataWindowRequest {
531    pub fn new(dataset: impl Into<String>, offset: usize, limit: usize) -> Self {
532        Self {
533            dataset: dataset.into(),
534            offset,
535            limit: limit.max(1),
536            query_token: None,
537            window_token: None,
538            wire_format: WireFormatProfile::Json,
539        }
540    }
541
542    pub fn with_query_token(mut self, query_token: impl Into<String>) -> Self {
543        self.query_token = Some(query_token.into());
544        self
545    }
546
547    pub fn with_window_token(mut self, window_token: impl Into<String>) -> Self {
548        self.window_token = Some(window_token.into());
549        self
550    }
551
552    pub fn compact(mut self) -> Self {
553        self.wire_format = WireFormatProfile::Compact;
554        self
555    }
556}
557
558#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
559pub struct DataWindowResponse {
560    pub dataset: String,
561    pub offset: usize,
562    pub limit: usize,
563    pub total_rows: usize,
564    pub query_token: String,
565    pub window_token: String,
566    pub rows: Vec<Row>,
567    #[serde(default, skip_serializing_if = "Option::is_none")]
568    pub compact_rows: Option<CompactRowsPayload>,
569}
570
571pub trait SingleStoreAdapter:
572    TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
573{
574    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
575        let started_at = Instant::now();
576        let statement = query.statement.as_str();
577        let param_count = query.params.len();
578        let result = self.execute(&query, context);
579        match &result {
580            Ok(rows) => info!(
581                target: "shelly.integration.query",
582                source = "singlestore",
583                operation = "run_query",
584                tenant_id = ?context.tenant_id,
585                trace_id = ?context.trace_id,
586                tag_count = context.tags.len(),
587                statement,
588                param_count,
589                row_count = rows.len(),
590                duration_ms = started_at.elapsed().as_millis() as u64,
591                "Shelly integration query executed"
592            ),
593            Err(err) => warn!(
594                target: "shelly.integration.query",
595                source = "singlestore",
596                operation = "run_query",
597                tenant_id = ?context.tenant_id,
598                trace_id = ?context.trace_id,
599                tag_count = context.tags.len(),
600                statement,
601                param_count,
602                duration_ms = started_at.elapsed().as_millis() as u64,
603                error = %err,
604                "Shelly integration query failed"
605            ),
606        }
607        result
608    }
609
610    fn run_high_volume_window_query(
611        &self,
612        query: SqlCommand,
613        request: DataWindowRequest,
614        context: &QueryContext,
615    ) -> IntegrationResult<DataWindowResponse> {
616        let rows = self.run_query(query, context)?;
617        Ok(materialize_window_response(
618            "singlestore",
619            request,
620            rows,
621            None,
622        ))
623    }
624}
625
626pub trait ClickHouseAdapter: TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> {
627    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
628        let started_at = Instant::now();
629        let statement = query.statement.as_str();
630        let param_count = query.params.len();
631        let result = self.execute(&query, context);
632        match &result {
633            Ok(rows) => info!(
634                target: "shelly.integration.query",
635                source = "clickhouse",
636                operation = "run_query",
637                tenant_id = ?context.tenant_id,
638                trace_id = ?context.trace_id,
639                correlation_id = context.correlation_id().unwrap_or("-"),
640                request_id = context.request_id().unwrap_or("-"),
641                tag_count = context.tags.len(),
642                statement,
643                param_count,
644                row_count = rows.len(),
645                duration_ms = started_at.elapsed().as_millis() as u64,
646                "Shelly integration query executed"
647            ),
648            Err(err) => warn!(
649                target: "shelly.integration.query",
650                source = "clickhouse",
651                operation = "run_query",
652                tenant_id = ?context.tenant_id,
653                trace_id = ?context.trace_id,
654                correlation_id = context.correlation_id().unwrap_or("-"),
655                request_id = context.request_id().unwrap_or("-"),
656                tag_count = context.tags.len(),
657                statement,
658                param_count,
659                duration_ms = started_at.elapsed().as_millis() as u64,
660                error = %err,
661                "Shelly integration query failed"
662            ),
663        }
664        result
665    }
666
667    fn run_high_volume_window_query(
668        &self,
669        query: SqlCommand,
670        request: DataWindowRequest,
671        context: &QueryContext,
672    ) -> IntegrationResult<DataWindowResponse> {
673        let rows = self.run_query(query, context)?;
674        Ok(materialize_window_response(
675            "clickhouse",
676            request,
677            rows,
678            None,
679        ))
680    }
681}
682
683pub trait BigQueryAdapter: TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> {
684    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
685        let started_at = Instant::now();
686        let statement = query.statement.as_str();
687        let param_count = query.params.len();
688        let result = self.execute(&query, context);
689        match &result {
690            Ok(rows) => info!(
691                target: "shelly.integration.query",
692                source = "bigquery",
693                operation = "run_query",
694                tenant_id = ?context.tenant_id,
695                trace_id = ?context.trace_id,
696                correlation_id = context.correlation_id().unwrap_or("-"),
697                request_id = context.request_id().unwrap_or("-"),
698                tag_count = context.tags.len(),
699                statement,
700                param_count,
701                row_count = rows.len(),
702                duration_ms = started_at.elapsed().as_millis() as u64,
703                "Shelly integration query executed"
704            ),
705            Err(err) => warn!(
706                target: "shelly.integration.query",
707                source = "bigquery",
708                operation = "run_query",
709                tenant_id = ?context.tenant_id,
710                trace_id = ?context.trace_id,
711                correlation_id = context.correlation_id().unwrap_or("-"),
712                request_id = context.request_id().unwrap_or("-"),
713                tag_count = context.tags.len(),
714                statement,
715                param_count,
716                duration_ms = started_at.elapsed().as_millis() as u64,
717                error = %err,
718                "Shelly integration query failed"
719            ),
720        }
721        result
722    }
723
724    fn run_high_volume_window_query(
725        &self,
726        query: SqlCommand,
727        request: DataWindowRequest,
728        context: &QueryContext,
729    ) -> IntegrationResult<DataWindowResponse> {
730        let rows = self.run_query(query, context)?;
731        Ok(materialize_window_response("bigquery", request, rows, None))
732    }
733}
734
735#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
736pub struct SearchRequest {
737    pub index: String,
738    pub text: String,
739    pub filters: Vec<crate::query::Filter>,
740    pub page: usize,
741    pub per_page: usize,
742}
743
744impl SearchRequest {
745    pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
746        Self {
747            index: index.into(),
748            text: text.into(),
749            filters: Vec::new(),
750            page: 1,
751            per_page: 25,
752        }
753    }
754
755    pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
756        self.filters.push(filter);
757        self
758    }
759}
760
761#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
762pub struct SearchResponse {
763    pub total_hits: usize,
764    pub rows: Vec<StoredRow>,
765}
766
767pub trait OpenSearchAdapter:
768    TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
769{
770    fn search(
771        &self,
772        request: SearchRequest,
773        context: &QueryContext,
774    ) -> IntegrationResult<SearchResponse> {
775        let started_at = Instant::now();
776        let index = request.index.as_str();
777        let text = request.text.as_str();
778        let filter_count = request.filters.len();
779        let page = request.page;
780        let per_page = request.per_page;
781        let result = self.execute(&request, context);
782        match &result {
783            Ok(response) => info!(
784                target: "shelly.integration.query",
785                source = "opensearch",
786                operation = "search",
787                tenant_id = ?context.tenant_id,
788                trace_id = ?context.trace_id,
789                tag_count = context.tags.len(),
790                index,
791                text,
792                filter_count,
793                page,
794                per_page,
795                row_count = response.rows.len(),
796                total_hits = response.total_hits,
797                duration_ms = started_at.elapsed().as_millis() as u64,
798                "Shelly integration query executed"
799            ),
800            Err(err) => warn!(
801                target: "shelly.integration.query",
802                source = "opensearch",
803                operation = "search",
804                tenant_id = ?context.tenant_id,
805                trace_id = ?context.trace_id,
806                tag_count = context.tags.len(),
807                index,
808                text,
809                filter_count,
810                page,
811                per_page,
812                duration_ms = started_at.elapsed().as_millis() as u64,
813                error = %err,
814                "Shelly integration query failed"
815            ),
816        }
817        result
818    }
819
820    fn search_window(
821        &self,
822        mut request: SearchRequest,
823        window: DataWindowRequest,
824        context: &QueryContext,
825    ) -> IntegrationResult<DataWindowResponse> {
826        let limit = window.limit.max(1);
827        request.page = (window.offset / limit).saturating_add(1);
828        request.per_page = limit;
829        let response = self.search(request, context)?;
830        let rows = response
831            .rows
832            .into_iter()
833            .map(|row| row.data)
834            .collect::<Vec<_>>();
835        let query_token = window
836            .query_token
837            .unwrap_or_else(|| format!("opensearch:{}:{}", window.dataset, response.total_hits));
838        let window_token = window.window_token.unwrap_or_else(|| {
839            format!(
840                "{query_token}:offset={}:limit={limit}:rows={}",
841                window.offset,
842                rows.len()
843            )
844        });
845        let compact_rows =
846            (window.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
847        Ok(DataWindowResponse {
848            dataset: window.dataset,
849            offset: window.offset,
850            limit,
851            total_rows: response.total_hits,
852            query_token,
853            window_token,
854            rows,
855            compact_rows,
856        })
857    }
858}
859
860pub trait AnalyticsSink: Send + Sync {
861    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
862}
863
864#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
865pub struct AnalyticsEvent {
866    pub namespace: String,
867    pub name: String,
868    pub payload: Value,
869}
870
871impl AnalyticsEvent {
872    pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
873        Self {
874            namespace: namespace.into(),
875            name: name.into(),
876            payload,
877        }
878    }
879}
880
881#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
882pub struct JobRequest {
883    pub workflow: String,
884    pub payload: Value,
885    pub idempotency_key: String,
886    pub metadata: BTreeMap<String, String>,
887}
888
889impl JobRequest {
890    pub fn new(
891        workflow: impl Into<String>,
892        payload: Value,
893        idempotency_key: impl Into<String>,
894    ) -> Self {
895        Self {
896            workflow: workflow.into(),
897            payload,
898            idempotency_key: idempotency_key.into(),
899            metadata: BTreeMap::new(),
900        }
901    }
902}
903
904#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
905#[serde(rename_all = "snake_case")]
906pub enum JobState {
907    Queued,
908    Running,
909    Succeeded,
910    Failed,
911    Canceled,
912}
913
914#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
915pub struct JobStatus {
916    pub id: String,
917    pub state: JobState,
918    pub attempts: u32,
919    pub result: Option<Value>,
920    pub error: Option<IntegrationError>,
921}
922
923#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
924pub struct JobHandle {
925    pub id: String,
926    pub workflow: String,
927    pub idempotency_key: String,
928}
929
930pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
931
932pub trait JobOrchestrator: Send + Sync {
933    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
934    fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
935    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
936    fn register_completion_callback(&self, callback: JobCompletionCallback);
937}
938
939pub trait TriggerDevAdapter: Send + Sync {
940    fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
941    fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
942    fn poll_workflow(
943        &self,
944        id: &str,
945        attempts: u32,
946        backoff_ms: u64,
947    ) -> IntegrationResult<JobStatus>;
948}
949
950impl<T> TriggerDevAdapter for T
951where
952    T: JobOrchestrator + Send + Sync,
953{
954    fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
955        self.enqueue(request)
956    }
957
958    fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
959        self.status(id)
960    }
961
962    fn poll_workflow(
963        &self,
964        id: &str,
965        attempts: u32,
966        backoff_ms: u64,
967    ) -> IntegrationResult<JobStatus> {
968        self.poll(id, attempts, backoff_ms)
969    }
970}
971
972#[derive(Clone)]
973pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
974    sink: Arc<S>,
975    namespace: String,
976}
977
978impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
979    pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
980        Self {
981            sink,
982            namespace: namespace.into(),
983        }
984    }
985
986    pub fn emit(
987        &self,
988        name: impl Into<String>,
989        payload: Value,
990        context: &QueryContext,
991    ) -> IntegrationResult<()> {
992        let mut envelope = serde_json::Map::new();
993        envelope.insert("payload".to_string(), payload);
994        envelope.insert(
995            "tenant_id".to_string(),
996            context
997                .tenant_id
998                .as_ref()
999                .map(|value| Value::String(value.clone()))
1000                .unwrap_or(Value::Null),
1001        );
1002        envelope.insert(
1003            "trace_id".to_string(),
1004            context
1005                .trace_id
1006                .as_ref()
1007                .map(|value| Value::String(value.clone()))
1008                .unwrap_or(Value::Null),
1009        );
1010        envelope.insert(
1011            "correlation_id".to_string(),
1012            context
1013                .correlation_id()
1014                .map(|value| Value::String(value.to_string()))
1015                .unwrap_or(Value::Null),
1016        );
1017        envelope.insert(
1018            "request_id".to_string(),
1019            context
1020                .request_id()
1021                .map(|value| Value::String(value.to_string()))
1022                .unwrap_or(Value::Null),
1023        );
1024        envelope.insert("tags".to_string(), serde_json::json!(context.tags));
1025
1026        self.sink.send_event(AnalyticsEvent::new(
1027            self.namespace.clone(),
1028            name.into(),
1029            Value::Object(envelope),
1030        ))
1031    }
1032}
1033
1034#[derive(Debug, Clone, Default)]
1035pub struct InMemorySingleStoreAdapter {
1036    rows: Vec<Row>,
1037}
1038
1039impl InMemorySingleStoreAdapter {
1040    pub fn new(rows: Vec<Row>) -> Self {
1041        Self { rows }
1042    }
1043}
1044
1045impl TypedQueryBoundary for InMemorySingleStoreAdapter {
1046    type Request = SqlCommand;
1047    type Response = Vec<Row>;
1048
1049    fn execute(
1050        &self,
1051        request: &Self::Request,
1052        _context: &QueryContext,
1053    ) -> IntegrationResult<Self::Response> {
1054        if request.statement.trim().is_empty() {
1055            return Err(IntegrationError::new(
1056                "singlestore",
1057                IntegrationErrorKind::InvalidInput,
1058                "empty SQL statement",
1059            )
1060            .with_code("empty_statement"));
1061        }
1062        Ok(self.rows.clone())
1063    }
1064}
1065
1066impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
1067
1068#[derive(Debug, Clone, Default)]
1069pub struct InMemoryClickHouseAdapter {
1070    rows: Vec<Row>,
1071}
1072
1073impl InMemoryClickHouseAdapter {
1074    pub fn new(rows: Vec<Row>) -> Self {
1075        Self { rows }
1076    }
1077}
1078
1079impl TypedQueryBoundary for InMemoryClickHouseAdapter {
1080    type Request = SqlCommand;
1081    type Response = Vec<Row>;
1082
1083    fn execute(
1084        &self,
1085        request: &Self::Request,
1086        _context: &QueryContext,
1087    ) -> IntegrationResult<Self::Response> {
1088        if request.statement.trim().is_empty() {
1089            return Err(IntegrationError::new(
1090                "clickhouse",
1091                IntegrationErrorKind::InvalidInput,
1092                "empty SQL statement",
1093            )
1094            .with_code("empty_statement"));
1095        }
1096        Ok(self.rows.clone())
1097    }
1098}
1099
1100impl ClickHouseAdapter for InMemoryClickHouseAdapter {}
1101
1102#[derive(Debug, Clone, Default)]
1103pub struct InMemoryBigQueryAdapter {
1104    rows: Vec<Row>,
1105}
1106
1107impl InMemoryBigQueryAdapter {
1108    pub fn new(rows: Vec<Row>) -> Self {
1109        Self { rows }
1110    }
1111}
1112
1113impl TypedQueryBoundary for InMemoryBigQueryAdapter {
1114    type Request = SqlCommand;
1115    type Response = Vec<Row>;
1116
1117    fn execute(
1118        &self,
1119        request: &Self::Request,
1120        _context: &QueryContext,
1121    ) -> IntegrationResult<Self::Response> {
1122        if request.statement.trim().is_empty() {
1123            return Err(IntegrationError::new(
1124                "bigquery",
1125                IntegrationErrorKind::InvalidInput,
1126                "empty SQL statement",
1127            )
1128            .with_code("empty_statement"));
1129        }
1130        Ok(self.rows.clone())
1131    }
1132}
1133
1134impl BigQueryAdapter for InMemoryBigQueryAdapter {}
1135
1136#[derive(Debug, Clone, Default)]
1137pub struct InMemoryOpenSearchAdapter {
1138    rows: Vec<StoredRow>,
1139}
1140
1141impl InMemoryOpenSearchAdapter {
1142    pub fn new(rows: Vec<StoredRow>) -> Self {
1143        Self { rows }
1144    }
1145}
1146
1147impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
1148    type Request = SearchRequest;
1149    type Response = SearchResponse;
1150
1151    fn execute(
1152        &self,
1153        request: &Self::Request,
1154        _context: &QueryContext,
1155    ) -> IntegrationResult<Self::Response> {
1156        if request.index.trim().is_empty() {
1157            return Err(IntegrationError::new(
1158                "opensearch",
1159                IntegrationErrorKind::InvalidInput,
1160                "search index must not be empty",
1161            )
1162            .with_code("empty_index"));
1163        }
1164        let needle = request.text.trim().to_lowercase();
1165        let mut filtered = self
1166            .rows
1167            .iter()
1168            .filter(|row| {
1169                if needle.is_empty() {
1170                    return true;
1171                }
1172                row.data.values().any(|value| {
1173                    value
1174                        .as_str()
1175                        .map(|text| text.to_lowercase().contains(&needle))
1176                        .unwrap_or(false)
1177                })
1178            })
1179            .cloned()
1180            .collect::<Vec<_>>();
1181        let total_hits = filtered.len();
1182        let page = request.page.max(1);
1183        let per_page = request.per_page.max(1);
1184        let offset = (page - 1) * per_page;
1185        filtered = filtered.into_iter().skip(offset).take(per_page).collect();
1186        Ok(SearchResponse {
1187            total_hits,
1188            rows: filtered,
1189        })
1190    }
1191}
1192
1193impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
1194
1195#[derive(Debug, Default, Clone)]
1196pub struct InMemoryAxiomSink {
1197    events: Arc<Mutex<Vec<AnalyticsEvent>>>,
1198}
1199
1200impl InMemoryAxiomSink {
1201    pub fn events(&self) -> Vec<AnalyticsEvent> {
1202        self.events
1203            .lock()
1204            .map(|events| events.clone())
1205            .unwrap_or_default()
1206    }
1207}
1208
1209impl AnalyticsSink for InMemoryAxiomSink {
1210    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
1211        let started_at = Instant::now();
1212        let namespace = event.namespace.clone();
1213        let name = event.name.clone();
1214        self.events
1215            .lock()
1216            .map_err(|_| {
1217                IntegrationError::new(
1218                    "axiom",
1219                    IntegrationErrorKind::Unavailable,
1220                    "analytics sink lock poisoned",
1221                )
1222            })?
1223            .push(event);
1224        info!(
1225            target: "shelly.integration.query",
1226            source = "axiom",
1227            operation = "send_event",
1228            namespace,
1229            event_name = name,
1230            duration_ms = started_at.elapsed().as_millis() as u64,
1231            "Shelly integration call executed"
1232        );
1233        Ok(())
1234    }
1235}
1236
1237#[derive(Default)]
1238pub struct InMemoryJobOrchestrator {
1239    statuses: Mutex<HashMap<String, JobStatus>>,
1240    callbacks: Mutex<Vec<JobCompletionCallback>>,
1241    next_id: Mutex<u64>,
1242}
1243
1244impl InMemoryJobOrchestrator {
1245    pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
1246        let status = self.with_status_mut(id, |status| {
1247            status.state = JobState::Succeeded;
1248            status.result = Some(result);
1249            status.error = None;
1250        })?;
1251        self.notify(&status);
1252        Ok(())
1253    }
1254
1255    pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
1256        let status = self.with_status_mut(id, |status| {
1257            status.state = JobState::Failed;
1258            status.result = None;
1259            status.error = Some(error);
1260        })?;
1261        self.notify(&status);
1262        Ok(())
1263    }
1264
1265    fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
1266    where
1267        F: FnOnce(&mut JobStatus),
1268    {
1269        let mut statuses = self.statuses.lock().map_err(|_| {
1270            IntegrationError::new(
1271                "trigger",
1272                IntegrationErrorKind::Unavailable,
1273                "job status lock poisoned",
1274            )
1275        })?;
1276        let Some(status) = statuses.get_mut(id) else {
1277            return Err(IntegrationError::new(
1278                "trigger",
1279                IntegrationErrorKind::InvalidInput,
1280                "job not found",
1281            )
1282            .with_code("job_not_found"));
1283        };
1284        status.attempts = status.attempts.saturating_add(1);
1285        update(status);
1286        Ok(status.clone())
1287    }
1288
1289    fn notify(&self, status: &JobStatus) {
1290        if let Ok(callbacks) = self.callbacks.lock() {
1291            for callback in callbacks.iter() {
1292                callback(status);
1293            }
1294        }
1295    }
1296
1297    fn is_terminal(state: JobState) -> bool {
1298        matches!(
1299            state,
1300            JobState::Succeeded | JobState::Failed | JobState::Canceled
1301        )
1302    }
1303}
1304
1305impl JobOrchestrator for InMemoryJobOrchestrator {
1306    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
1307        let started_at = Instant::now();
1308        let workflow = request.workflow.clone();
1309        let idempotency_key = request.idempotency_key.clone();
1310        let mut next_id = self.next_id.lock().map_err(|_| {
1311            IntegrationError::new(
1312                "trigger",
1313                IntegrationErrorKind::Unavailable,
1314                "job id lock poisoned",
1315            )
1316        })?;
1317        *next_id = next_id.saturating_add(1);
1318        let id = format!("job-{next_id}");
1319        let status = JobStatus {
1320            id: id.clone(),
1321            state: JobState::Queued,
1322            attempts: 0,
1323            result: None,
1324            error: None,
1325        };
1326        self.statuses
1327            .lock()
1328            .map_err(|_| {
1329                IntegrationError::new(
1330                    "trigger",
1331                    IntegrationErrorKind::Unavailable,
1332                    "job status lock poisoned",
1333                )
1334            })?
1335            .insert(id.clone(), status);
1336        let handle = JobHandle {
1337            id,
1338            workflow: request.workflow,
1339            idempotency_key: request.idempotency_key,
1340        };
1341        info!(
1342            target: "shelly.integration.query",
1343            source = "trigger",
1344            operation = "enqueue",
1345            workflow,
1346            idempotency_key,
1347            job_id = handle.id.as_str(),
1348            duration_ms = started_at.elapsed().as_millis() as u64,
1349            "Shelly integration call executed"
1350        );
1351        Ok(handle)
1352    }
1353
1354    fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
1355        let started_at = Instant::now();
1356        let result = self
1357            .statuses
1358            .lock()
1359            .map_err(|_| {
1360                IntegrationError::new(
1361                    "trigger",
1362                    IntegrationErrorKind::Unavailable,
1363                    "job status lock poisoned",
1364                )
1365            })?
1366            .get(id)
1367            .cloned()
1368            .ok_or_else(|| {
1369                IntegrationError::new(
1370                    "trigger",
1371                    IntegrationErrorKind::InvalidInput,
1372                    "job not found",
1373                )
1374                .with_code("job_not_found")
1375            });
1376        match &result {
1377            Ok(status) => info!(
1378                target: "shelly.integration.query",
1379                source = "trigger",
1380                operation = "status",
1381                job_id = id,
1382                job_state = ?status.state,
1383                attempts = status.attempts,
1384                duration_ms = started_at.elapsed().as_millis() as u64,
1385                "Shelly integration call executed"
1386            ),
1387            Err(err) => warn!(
1388                target: "shelly.integration.query",
1389                source = "trigger",
1390                operation = "status",
1391                job_id = id,
1392                duration_ms = started_at.elapsed().as_millis() as u64,
1393                error = %err,
1394                "Shelly integration call failed"
1395            ),
1396        }
1397        result
1398    }
1399
1400    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
1401        let started_at = Instant::now();
1402        let attempts = attempts.max(1);
1403        let mut polled = 0u32;
1404        for current in 1..=attempts {
1405            let status = self.status(id)?;
1406            polled = current;
1407            if Self::is_terminal(status.state) || current == attempts {
1408                info!(
1409                    target: "shelly.integration.query",
1410                    source = "trigger",
1411                    operation = "poll",
1412                    job_id = id,
1413                    attempts = polled,
1414                    backoff_ms,
1415                    terminal = Self::is_terminal(status.state),
1416                    job_state = ?status.state,
1417                    duration_ms = started_at.elapsed().as_millis() as u64,
1418                    "Shelly integration call executed"
1419                );
1420                return Ok(status);
1421            }
1422            if backoff_ms > 0 {
1423                thread::sleep(Duration::from_millis(backoff_ms));
1424            }
1425        }
1426
1427        let result = self.status(id);
1428        match &result {
1429            Ok(status) => info!(
1430                target: "shelly.integration.query",
1431                source = "trigger",
1432                operation = "poll",
1433                job_id = id,
1434                attempts = polled,
1435                backoff_ms,
1436                terminal = Self::is_terminal(status.state),
1437                job_state = ?status.state,
1438                duration_ms = started_at.elapsed().as_millis() as u64,
1439                "Shelly integration call executed"
1440            ),
1441            Err(err) => warn!(
1442                target: "shelly.integration.query",
1443                source = "trigger",
1444                operation = "poll",
1445                job_id = id,
1446                attempts = polled,
1447                backoff_ms,
1448                duration_ms = started_at.elapsed().as_millis() as u64,
1449                error = %err,
1450                "Shelly integration call failed"
1451            ),
1452        }
1453        result
1454    }
1455
1456    fn register_completion_callback(&self, callback: JobCompletionCallback) {
1457        if let Ok(mut callbacks) = self.callbacks.lock() {
1458            callbacks.push(callback);
1459            info!(
1460                target: "shelly.integration.query",
1461                source = "trigger",
1462                operation = "register_completion_callback",
1463                callback_count = callbacks.len(),
1464                "Shelly integration callback registered"
1465            );
1466        } else {
1467            warn!(
1468                target: "shelly.integration.query",
1469                source = "trigger",
1470                operation = "register_completion_callback",
1471                "Shelly integration callback registration failed"
1472            );
1473        }
1474    }
1475}
1476
1477#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1478pub struct AdapterConformanceCheck {
1479    pub name: String,
1480    pub passed: bool,
1481    pub detail: String,
1482}
1483
1484#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
1485pub struct AdapterConformanceReport {
1486    pub checks: Vec<AdapterConformanceCheck>,
1487}
1488
1489impl AdapterConformanceReport {
1490    pub fn passed(&self) -> bool {
1491        self.checks.iter().all(|check| check.passed)
1492    }
1493
1494    pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1495        self.checks.push(AdapterConformanceCheck {
1496            name: name.into(),
1497            passed: true,
1498            detail: detail.into(),
1499        });
1500    }
1501
1502    pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1503        self.checks.push(AdapterConformanceCheck {
1504            name: name.into(),
1505            passed: false,
1506            detail: detail.into(),
1507        });
1508    }
1509}
1510
1511pub fn run_adapter_conformance_suite(
1512    singlestore: &dyn SingleStoreAdapter,
1513    clickhouse: &dyn ClickHouseAdapter,
1514    bigquery: &dyn BigQueryAdapter,
1515    opensearch: &dyn OpenSearchAdapter,
1516    trigger: &dyn TriggerDevAdapter,
1517    analytics: &dyn AnalyticsSink,
1518    context: &QueryContext,
1519) -> AdapterConformanceReport {
1520    let mut report = AdapterConformanceReport::default();
1521
1522    match singlestore.run_query(
1523        SqlCommand::new(
1524            "SELECT tenant, active_accounts FROM tenant_rollup",
1525            Vec::new(),
1526        ),
1527        context,
1528    ) {
1529        Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
1530        Err(err) => report.add_fail("singlestore.query", err.to_string()),
1531    }
1532
1533    match clickhouse.run_query(
1534        SqlCommand::new("SELECT account, events FROM account_events", Vec::new()),
1535        context,
1536    ) {
1537        Ok(rows) => report.add_pass("clickhouse.query", format!("rows={}", rows.len())),
1538        Err(err) => report.add_fail("clickhouse.query", err.to_string()),
1539    }
1540
1541    match bigquery.run_query(
1542        SqlCommand::new("SELECT account, arr FROM analytics_rollup", Vec::new()),
1543        context,
1544    ) {
1545        Ok(rows) => report.add_pass("bigquery.query", format!("rows={}", rows.len())),
1546        Err(err) => report.add_fail("bigquery.query", err.to_string()),
1547    }
1548
1549    match opensearch.search(SearchRequest::new("accounts", ""), context) {
1550        Ok(response) => report.add_pass(
1551            "opensearch.search",
1552            format!(
1553                "rows={} total_hits={}",
1554                response.rows.len(),
1555                response.total_hits
1556            ),
1557        ),
1558        Err(err) => report.add_fail("opensearch.search", err.to_string()),
1559    }
1560
1561    match trigger.trigger_workflow(JobRequest::new(
1562        "adapter_conformance_probe",
1563        serde_json::json!({"probe": true}),
1564        "adapter-conformance-probe",
1565    )) {
1566        Ok(handle) => {
1567            report.add_pass("trigger.enqueue", format!("id={}", handle.id));
1568            match trigger.workflow_status(&handle.id) {
1569                Ok(status) => report.add_pass(
1570                    "trigger.status",
1571                    format!("state={:?} attempts={}", status.state, status.attempts),
1572                ),
1573                Err(err) => report.add_fail("trigger.status", err.to_string()),
1574            }
1575        }
1576        Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
1577    }
1578
1579    let analytics_event = AnalyticsEvent::new(
1580        "adapter_conformance",
1581        "probe",
1582        serde_json::json!({
1583            "trace_id": context.trace_id,
1584            "correlation_id": context.correlation_id(),
1585            "request_id": context.request_id(),
1586        }),
1587    );
1588    match analytics.send_event(analytics_event) {
1589        Ok(()) => report.add_pass("analytics.emit", "event accepted"),
1590        Err(err) => report.add_fail("analytics.emit", err.to_string()),
1591    }
1592
1593    let context_correlation_check =
1594        context.correlation_id().is_some() && context.request_id().is_some();
1595    if context_correlation_check {
1596        report.add_pass(
1597            "context.correlation",
1598            "correlation_id and request_id present",
1599        );
1600    } else {
1601        report.add_fail(
1602            "context.correlation",
1603            "missing correlation_id/request_id; set QueryContext::with_correlation_id + with_request_id",
1604        );
1605    }
1606
1607    let retry_override_context = context.clone().with_retry_policy(RetryPolicy {
1608        max_attempts: 2,
1609        initial_backoff_ms: 0,
1610        max_backoff_ms: 0,
1611    });
1612    let mut retry_attempts = 0u32;
1613    match run_with_contract(
1614        "conformance",
1615        "retry_override_probe",
1616        AdapterCallContract::default_query(),
1617        &retry_override_context,
1618        |attempt| {
1619            retry_attempts = attempt;
1620            if attempt == 1 {
1621                Err(IntegrationError::new(
1622                    "conformance",
1623                    IntegrationErrorKind::Transient,
1624                    "transient probe failure",
1625                ))
1626            } else {
1627                Ok("ok")
1628            }
1629        },
1630    ) {
1631        Ok(_) if retry_attempts == 2 => report.add_pass(
1632            "contract.retry_override",
1633            "retry override honored with bounded attempts",
1634        ),
1635        Ok(_) => report.add_fail(
1636            "contract.retry_override",
1637            format!("unexpected attempts={retry_attempts} expected=2"),
1638        ),
1639        Err(err) => report.add_fail("contract.retry_override", err.to_string()),
1640    }
1641
1642    let timeout_context = context.clone().with_timeout_ms(1);
1643    match run_with_contract(
1644        "conformance",
1645        "timeout_probe",
1646        AdapterCallContract::default_query(),
1647        &timeout_context,
1648        |_| {
1649            thread::sleep(Duration::from_millis(5));
1650            Ok::<_, IntegrationError>("done")
1651        },
1652    ) {
1653        Err(err) if err.kind == IntegrationErrorKind::Timeout => {
1654            report.add_pass("contract.timeout", "timeout classification validated")
1655        }
1656        Err(err) => report.add_fail("contract.timeout", err.to_string()),
1657        Ok(_) => report.add_fail(
1658            "contract.timeout",
1659            "expected timeout but operation returned success",
1660        ),
1661    }
1662
1663    report
1664}
1665
1666pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
1667    DataError::Integration(format!("[{}] {}", source.into(), err))
1668}
1669
1670pub fn map_integration_result<T>(
1671    source: impl Into<String>,
1672    result: IntegrationResult<T>,
1673) -> DataResult<T> {
1674    result.map_err(|err| map_integration_error(source, err))
1675}
1676
1677pub fn query_from_search(request: &SearchRequest) -> Query {
1678    let mut query = Query::new().paginate(request.page, request.per_page);
1679    for filter in &request.filters {
1680        query = query.where_filter(filter.clone());
1681    }
1682    query
1683}
1684
1685fn materialize_window_response(
1686    source: &str,
1687    request: DataWindowRequest,
1688    rows: Vec<Row>,
1689    total_rows_hint: Option<usize>,
1690) -> DataWindowResponse {
1691    let total_rows = total_rows_hint.unwrap_or(rows.len());
1692    let offset = request.offset.min(total_rows);
1693    let limit = request.limit.max(1);
1694    let window_rows = rows
1695        .into_iter()
1696        .skip(offset)
1697        .take(limit)
1698        .collect::<Vec<_>>();
1699    let query_token = request
1700        .query_token
1701        .unwrap_or_else(|| format!("{source}:{}:{}", request.dataset, total_rows));
1702    let window_token = request.window_token.unwrap_or_else(|| {
1703        format!(
1704            "{query_token}:offset={offset}:limit={limit}:rows={}",
1705            window_rows.len()
1706        )
1707    });
1708    let compact_rows = (request.wire_format == WireFormatProfile::Compact)
1709        .then(|| encode_compact_rows(&window_rows));
1710
1711    DataWindowResponse {
1712        dataset: request.dataset,
1713        offset,
1714        limit,
1715        total_rows,
1716        query_token,
1717        window_token,
1718        rows: window_rows,
1719        compact_rows,
1720    }
1721}
1722
1723fn encode_compact_rows(rows: &[Row]) -> CompactRowsPayload {
1724    let mut columns: Vec<String> = Vec::new();
1725    for row in rows {
1726        for key in row.keys() {
1727            if !columns.iter().any(|column| column == key) {
1728                columns.push(key.clone());
1729            }
1730        }
1731    }
1732
1733    let encoded_rows = rows
1734        .iter()
1735        .map(|row| {
1736            columns
1737                .iter()
1738                .map(|column| row.get(column).cloned().unwrap_or(Value::Null))
1739                .collect::<Vec<_>>()
1740        })
1741        .collect::<Vec<_>>();
1742
1743    CompactRowsPayload {
1744        columns,
1745        rows: encoded_rows,
1746    }
1747}
1748
1749#[cfg(test)]
1750mod tests {
1751    use super::{
1752        map_integration_result, run_adapter_conformance_suite, run_with_contract,
1753        run_with_contract_async, run_with_retry, run_with_retry_async, AdapterCallContract,
1754        AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge, BigQueryAdapter, ClickHouseAdapter,
1755        ConnectionLifecycle, ConnectionLifecycleHook, DataWindowRequest, InMemoryAxiomSink,
1756        InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryJobOrchestrator,
1757        InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
1758        IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
1759        LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
1760        SearchResponse, SingleStoreAdapter, SqlCommand, TriggerDevAdapter, TypedQueryBoundary,
1761        CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID, CONTEXT_TAG_TIMEOUT_MS,
1762    };
1763    use serde_json::{json, Value};
1764    use std::sync::{Arc, Mutex};
1765
1766    struct CountingHook {
1767        connects: Arc<Mutex<u32>>,
1768        disconnects: Arc<Mutex<u32>>,
1769    }
1770
1771    impl ConnectionLifecycleHook for CountingHook {
1772        fn on_connect(&self) -> super::IntegrationResult<()> {
1773            let mut guard = self.connects.lock().unwrap();
1774            *guard += 1;
1775            Ok(())
1776        }
1777
1778        fn on_disconnect(&self) -> super::IntegrationResult<()> {
1779            let mut guard = self.disconnects.lock().unwrap();
1780            *guard += 1;
1781            Ok(())
1782        }
1783    }
1784
1785    #[test]
1786    fn lifecycle_hooks_are_called() {
1787        let connects = Arc::new(Mutex::new(0));
1788        let disconnects = Arc::new(Mutex::new(0));
1789        let mut lifecycle = LifecycleHooks::new();
1790        lifecycle.register_hook(Arc::new(CountingHook {
1791            connects: connects.clone(),
1792            disconnects: disconnects.clone(),
1793        }));
1794
1795        lifecycle.connect().unwrap();
1796        lifecycle.disconnect().unwrap();
1797        assert_eq!(*connects.lock().unwrap(), 1);
1798        assert_eq!(*disconnects.lock().unwrap(), 1);
1799    }
1800
1801    #[test]
1802    fn retry_policy_retries_transient_errors() {
1803        let mut calls = 0u32;
1804        let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
1805            calls = attempt;
1806            if attempt < 3 {
1807                Err(IntegrationError::new(
1808                    "opensearch",
1809                    IntegrationErrorKind::Transient,
1810                    "temporary failure",
1811                ))
1812            } else {
1813                Ok("ok")
1814            }
1815        })
1816        .unwrap();
1817
1818        assert_eq!(result, "ok");
1819        assert_eq!(calls, 3);
1820    }
1821
1822    #[test]
1823    fn retry_policy_stops_on_permanent_errors() {
1824        let mut calls = 0u32;
1825        let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
1826            calls = attempt;
1827            Err::<(), IntegrationError>(IntegrationError::new(
1828                "singlestore",
1829                IntegrationErrorKind::Permanent,
1830                "invalid sql",
1831            ))
1832        })
1833        .unwrap_err();
1834
1835        assert_eq!(calls, 1);
1836        assert_eq!(err.kind, IntegrationErrorKind::Permanent);
1837    }
1838
1839    #[test]
1840    fn integration_result_maps_into_data_error() {
1841        let mapped = map_integration_result::<()>(
1842            "trigger",
1843            Err(IntegrationError::new(
1844                "trigger",
1845                IntegrationErrorKind::Unavailable,
1846                "service unavailable",
1847            )),
1848        )
1849        .unwrap_err();
1850
1851        assert!(mapped.to_string().contains("service unavailable"));
1852    }
1853
1854    #[derive(Default)]
1855    struct InMemoryJobs {
1856        statuses: Arc<Mutex<Vec<JobStatus>>>,
1857        callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
1858    }
1859
1860    impl JobOrchestrator for InMemoryJobs {
1861        fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
1862            let id = format!("job-{}", request.idempotency_key);
1863            self.statuses.lock().unwrap().push(JobStatus {
1864                id: id.clone(),
1865                state: JobState::Queued,
1866                attempts: 0,
1867                result: None,
1868                error: None,
1869            });
1870            Ok(JobHandle {
1871                id,
1872                workflow: request.workflow,
1873                idempotency_key: request.idempotency_key,
1874            })
1875        }
1876
1877        fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
1878            self.statuses
1879                .lock()
1880                .unwrap()
1881                .iter()
1882                .find(|status| status.id == id)
1883                .cloned()
1884                .ok_or_else(|| {
1885                    IntegrationError::new(
1886                        "trigger",
1887                        IntegrationErrorKind::InvalidInput,
1888                        "job not found",
1889                    )
1890                })
1891        }
1892
1893        fn poll(
1894            &self,
1895            id: &str,
1896            _attempts: u32,
1897            _backoff_ms: u64,
1898        ) -> super::IntegrationResult<JobStatus> {
1899            self.status(id)
1900        }
1901
1902        fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
1903            self.callbacks.lock().unwrap().push(callback);
1904        }
1905    }
1906
1907    #[test]
1908    fn job_orchestration_contract_supports_enqueue_and_status() {
1909        let jobs = InMemoryJobs::default();
1910        let handle = jobs
1911            .enqueue(JobRequest::new(
1912                "sync_customer",
1913                json!({"id": 42}),
1914                "idempotent-42",
1915            ))
1916            .unwrap();
1917        let status = jobs.status(&handle.id).unwrap();
1918        assert_eq!(status.state, JobState::Queued);
1919        assert_eq!(handle.idempotency_key, "idempotent-42");
1920
1921        let ctx = QueryContext::default().with_tenant_id("tenant-a");
1922        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
1923    }
1924
1925    #[test]
1926    fn reference_singlestore_adapter_runs_typed_sql_boundary() {
1927        let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
1928            "region".to_string(),
1929            Value::String("EMEA".to_string()),
1930        )])]);
1931        let rows = adapter
1932            .run_query(
1933                SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1934                &QueryContext::default(),
1935            )
1936            .unwrap();
1937        assert_eq!(rows.len(), 1);
1938        assert_eq!(
1939            rows[0].get("region"),
1940            Some(&Value::String("EMEA".to_string()))
1941        );
1942    }
1943
1944    #[test]
1945    fn reference_clickhouse_adapter_runs_typed_sql_boundary() {
1946        let adapter = InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
1947            "events".to_string(),
1948            Value::Number(128.into()),
1949        )])]);
1950        let rows = adapter
1951            .run_query(
1952                SqlCommand::new("SELECT events FROM account_events", Vec::new()),
1953                &QueryContext::default(),
1954            )
1955            .unwrap();
1956        assert_eq!(rows.len(), 1);
1957        assert_eq!(rows[0].get("events"), Some(&Value::Number(128.into())));
1958    }
1959
1960    #[test]
1961    fn reference_bigquery_adapter_runs_typed_sql_boundary() {
1962        let adapter = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
1963            "active_accounts".to_string(),
1964            Value::Number(64.into()),
1965        )])]);
1966        let rows = adapter
1967            .run_query(
1968                SqlCommand::new("SELECT active_accounts FROM analytics_rollup", Vec::new()),
1969                &QueryContext::default(),
1970            )
1971            .unwrap();
1972        assert_eq!(rows.len(), 1);
1973        assert_eq!(
1974            rows[0].get("active_accounts"),
1975            Some(&Value::Number(64.into()))
1976        );
1977    }
1978
1979    #[test]
1980    fn reference_opensearch_adapter_filters_rows() {
1981        let rows = vec![
1982            crate::StoredRow {
1983                id: 1,
1984                data: std::collections::BTreeMap::from([(
1985                    "title".to_string(),
1986                    Value::String("Acme renewal".to_string()),
1987                )]),
1988            },
1989            crate::StoredRow {
1990                id: 2,
1991                data: std::collections::BTreeMap::from([(
1992                    "title".to_string(),
1993                    Value::String("Globex onboarding".to_string()),
1994                )]),
1995            },
1996        ];
1997        let adapter = InMemoryOpenSearchAdapter::new(rows);
1998        let response = adapter
1999            .search(
2000                SearchRequest::new("accounts", "renewal"),
2001                &QueryContext::default(),
2002            )
2003            .unwrap();
2004        assert_eq!(response.total_hits, 1);
2005        assert_eq!(response.rows[0].id, 1);
2006    }
2007
2008    #[test]
2009    fn high_volume_window_query_profiles_support_compact_payloads() {
2010        let sql_rows = vec![
2011            std::collections::BTreeMap::from([
2012                ("tenant".to_string(), Value::String("north".to_string())),
2013                ("arr".to_string(), Value::Number(120.into())),
2014            ]),
2015            std::collections::BTreeMap::from([
2016                ("tenant".to_string(), Value::String("south".to_string())),
2017                ("arr".to_string(), Value::Number(210.into())),
2018            ]),
2019            std::collections::BTreeMap::from([
2020                ("tenant".to_string(), Value::String("west".to_string())),
2021                ("arr".to_string(), Value::Number(330.into())),
2022            ]),
2023            std::collections::BTreeMap::from([
2024                ("tenant".to_string(), Value::String("east".to_string())),
2025                ("arr".to_string(), Value::Number(480.into())),
2026            ]),
2027        ];
2028        let context = QueryContext::default();
2029        let request = DataWindowRequest::new("tenant_rollup", 1, 2)
2030            .with_query_token("q:m51")
2031            .compact();
2032
2033        let singlestore = InMemorySingleStoreAdapter::new(sql_rows.clone());
2034        let clickhouse = InMemoryClickHouseAdapter::new(sql_rows.clone());
2035        let bigquery = InMemoryBigQueryAdapter::new(sql_rows.clone());
2036
2037        for response in [
2038            singlestore
2039                .run_high_volume_window_query(
2040                    SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2041                    request.clone(),
2042                    &context,
2043                )
2044                .expect("singlestore window query"),
2045            clickhouse
2046                .run_high_volume_window_query(
2047                    SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2048                    request.clone(),
2049                    &context,
2050                )
2051                .expect("clickhouse window query"),
2052            bigquery
2053                .run_high_volume_window_query(
2054                    SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2055                    request.clone(),
2056                    &context,
2057                )
2058                .expect("bigquery window query"),
2059        ] {
2060            assert_eq!(response.dataset, "tenant_rollup");
2061            assert_eq!(response.rows.len(), 2);
2062            assert_eq!(response.total_rows, 4);
2063            assert_eq!(response.query_token, "q:m51");
2064            let compact = response.compact_rows.expect("compact payload");
2065            assert!(compact.columns.contains(&"tenant".to_string()));
2066            assert!(compact.columns.contains(&"arr".to_string()));
2067            assert_eq!(compact.rows.len(), response.rows.len());
2068        }
2069    }
2070
2071    #[test]
2072    fn opensearch_window_search_uses_window_contract() {
2073        let rows = vec![
2074            crate::StoredRow {
2075                id: 1,
2076                data: std::collections::BTreeMap::from([
2077                    ("tenant".to_string(), Value::String("acme".to_string())),
2078                    ("status".to_string(), Value::String("healthy".to_string())),
2079                ]),
2080            },
2081            crate::StoredRow {
2082                id: 2,
2083                data: std::collections::BTreeMap::from([
2084                    ("tenant".to_string(), Value::String("globex".to_string())),
2085                    ("status".to_string(), Value::String("renewal".to_string())),
2086                ]),
2087            },
2088            crate::StoredRow {
2089                id: 3,
2090                data: std::collections::BTreeMap::from([
2091                    ("tenant".to_string(), Value::String("initech".to_string())),
2092                    ("status".to_string(), Value::String("at-risk".to_string())),
2093                ]),
2094            },
2095        ];
2096        let adapter = InMemoryOpenSearchAdapter::new(rows);
2097        let response = adapter
2098            .search_window(
2099                SearchRequest::new("accounts", ""),
2100                DataWindowRequest::new("accounts", 1, 2).compact(),
2101                &QueryContext::default(),
2102            )
2103            .expect("opensearch window search");
2104
2105        assert_eq!(response.dataset, "accounts");
2106        assert_eq!(response.offset, 1);
2107        assert_eq!(response.limit, 2);
2108        assert_eq!(response.total_rows, 3);
2109        assert_eq!(response.rows.len(), 2);
2110        assert!(response.compact_rows.is_some());
2111        assert!(!response.window_token.is_empty());
2112    }
2113
2114    #[test]
2115    fn reference_axiom_sink_records_events() {
2116        let sink = InMemoryAxiomSink::default();
2117        sink.send_event(AnalyticsEvent::new(
2118            "sales",
2119            "query_executed",
2120            json!({"latency_ms": 12}),
2121        ))
2122        .unwrap();
2123        let events = sink.events();
2124        assert_eq!(events.len(), 1);
2125        assert_eq!(events[0].name, "query_executed");
2126    }
2127
2128    #[test]
2129    fn reference_trigger_orchestrator_supports_completion_and_polling() {
2130        let orchestrator = InMemoryJobOrchestrator::default();
2131        let completed = Arc::new(Mutex::new(false));
2132        let completed_flag = completed.clone();
2133        orchestrator.register_completion_callback(Arc::new(move |status| {
2134            if status.state == JobState::Succeeded {
2135                if let Ok(mut guard) = completed_flag.lock() {
2136                    *guard = true;
2137                }
2138            }
2139        }));
2140
2141        let handle = orchestrator
2142            .enqueue(JobRequest::new(
2143                "refresh_dashboard",
2144                json!({"account_id": 7}),
2145                "refresh-7",
2146            ))
2147            .unwrap();
2148        orchestrator
2149            .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
2150            .unwrap();
2151        let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
2152        assert_eq!(status.state, JobState::Succeeded);
2153        assert_eq!(
2154            status.result,
2155            Some(json!({
2156                "rows_synced": 18
2157            }))
2158        );
2159        assert!(*completed.lock().unwrap());
2160    }
2161
2162    #[test]
2163    fn query_context_helpers_encode_trace_correlation_retry_timeout() {
2164        let context = QueryContext::default()
2165            .with_correlation_id("corr-42")
2166            .with_request_id("req-42")
2167            .with_timeout_ms(900)
2168            .with_retry_policy(RetryPolicy {
2169                max_attempts: 4,
2170                initial_backoff_ms: 25,
2171                max_backoff_ms: 100,
2172            });
2173
2174        assert_eq!(
2175            context
2176                .tags
2177                .get(CONTEXT_TAG_CORRELATION_ID)
2178                .map(String::as_str),
2179            Some("corr-42")
2180        );
2181        assert_eq!(
2182            context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
2183            Some("req-42")
2184        );
2185        assert_eq!(
2186            context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
2187            Some("900")
2188        );
2189        assert_eq!(context.correlation_id(), Some("corr-42"));
2190        assert_eq!(context.request_id(), Some("req-42"));
2191        assert_eq!(context.timeout_ms(), Some(900));
2192        assert_eq!(
2193            context.retry_policy_override(),
2194            Some(RetryPolicy {
2195                max_attempts: 4,
2196                initial_backoff_ms: 25,
2197                max_backoff_ms: 100
2198            })
2199        );
2200    }
2201
2202    #[test]
2203    fn run_with_contract_respects_context_retry_override() {
2204        let context = QueryContext::default().with_retry_policy(RetryPolicy {
2205            max_attempts: 2,
2206            initial_backoff_ms: 0,
2207            max_backoff_ms: 0,
2208        });
2209        let mut attempts = 0u32;
2210        let result = run_with_contract(
2211            "opensearch",
2212            "search",
2213            AdapterCallContract::default()
2214                .with_retry_policy(RetryPolicy::never())
2215                .with_timeout_ms(1_000),
2216            &context,
2217            |attempt| {
2218                attempts = attempt;
2219                if attempt == 1 {
2220                    Err(IntegrationError::new(
2221                        "opensearch",
2222                        IntegrationErrorKind::Transient,
2223                        "temporary failure",
2224                    ))
2225                } else {
2226                    Ok("ok")
2227                }
2228            },
2229        )
2230        .unwrap();
2231        assert_eq!(result, "ok");
2232        assert_eq!(attempts, 2);
2233    }
2234
2235    #[tokio::test]
2236    async fn run_with_contract_async_respects_context_retry_override() {
2237        let context = QueryContext::default().with_retry_policy(RetryPolicy {
2238            max_attempts: 2,
2239            initial_backoff_ms: 0,
2240            max_backoff_ms: 0,
2241        });
2242        let attempts = Arc::new(Mutex::new(0u32));
2243        let attempts_for_closure = attempts.clone();
2244        let result = run_with_contract_async(
2245            "opensearch",
2246            "search_async",
2247            AdapterCallContract::default()
2248                .with_retry_policy(RetryPolicy::never())
2249                .with_timeout_ms(1_000),
2250            &context,
2251            move |attempt| {
2252                let attempts_for_step = attempts_for_closure.clone();
2253                async move {
2254                    if let Ok(mut guard) = attempts_for_step.lock() {
2255                        *guard = attempt;
2256                    }
2257                    if attempt == 1 {
2258                        Err(IntegrationError::new(
2259                            "opensearch",
2260                            IntegrationErrorKind::Transient,
2261                            "temporary failure",
2262                        ))
2263                    } else {
2264                        Ok("ok")
2265                    }
2266                }
2267            },
2268        )
2269        .await
2270        .unwrap();
2271        assert_eq!(result, "ok");
2272        assert_eq!(*attempts.lock().unwrap(), 2);
2273    }
2274
2275    #[tokio::test]
2276    async fn run_with_retry_async_retries_transient_errors() {
2277        let calls = Arc::new(Mutex::new(0u32));
2278        let calls_for_closure = calls.clone();
2279        let result = run_with_retry_async(RetryPolicy::conservative(), move |attempt| {
2280            let calls_for_step = calls_for_closure.clone();
2281            async move {
2282                if let Ok(mut guard) = calls_for_step.lock() {
2283                    *guard = attempt;
2284                }
2285                if attempt < 3 {
2286                    Err(IntegrationError::new(
2287                        "bigquery",
2288                        IntegrationErrorKind::Transient,
2289                        "temporary failure",
2290                    ))
2291                } else {
2292                    Ok("ok")
2293                }
2294            }
2295        })
2296        .await
2297        .unwrap();
2298        assert_eq!(result, "ok");
2299        assert_eq!(*calls.lock().unwrap(), 3);
2300    }
2301
2302    #[test]
2303    fn run_with_contract_returns_timeout_error() {
2304        let context = QueryContext::default().with_timeout_ms(5);
2305        let err = run_with_contract(
2306            "singlestore",
2307            "run_query",
2308            AdapterCallContract::default(),
2309            &context,
2310            |_| {
2311                std::thread::sleep(std::time::Duration::from_millis(15));
2312                Ok::<_, IntegrationError>("done")
2313            },
2314        )
2315        .unwrap_err();
2316        assert_eq!(err.kind, IntegrationErrorKind::Timeout);
2317        assert_eq!(err.code.as_deref(), Some("operation_timeout"));
2318    }
2319
2320    #[test]
2321    fn axiom_bridge_enriches_events_with_trace_and_correlation() {
2322        let sink = Arc::new(InMemoryAxiomSink::default());
2323        let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
2324        let context = QueryContext {
2325            tenant_id: Some("tenant-a".to_string()),
2326            trace_id: Some("trace-123".to_string()),
2327            tags: std::collections::BTreeMap::new(),
2328        }
2329        .with_correlation_id("corr-9")
2330        .with_request_id("req-9");
2331        bridge
2332            .emit("session_event", json!({"event":"patch"}), &context)
2333            .unwrap();
2334        let events = sink.events();
2335        assert_eq!(events.len(), 1);
2336        let payload = events[0].payload.as_object().unwrap();
2337        assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
2338        assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
2339        assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
2340    }
2341
2342    #[test]
2343    fn conformance_suite_passes_with_reference_adapters() {
2344        let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
2345            [("tenant".to_string(), Value::String("north".to_string()))],
2346        )]);
2347        let clickhouse =
2348            InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
2349                "events".to_string(),
2350                Value::Number(99.into()),
2351            )])]);
2352        let bigquery = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
2353            "accounts".to_string(),
2354            Value::Number(33.into()),
2355        )])]);
2356        let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
2357            id: 1,
2358            data: std::collections::BTreeMap::from([(
2359                "title".to_string(),
2360                Value::String("Acme renewal".to_string()),
2361            )]),
2362        }]);
2363        let trigger = InMemoryJobOrchestrator::default();
2364        let analytics = InMemoryAxiomSink::default();
2365        let report = run_adapter_conformance_suite(
2366            &singlestore,
2367            &clickhouse,
2368            &bigquery,
2369            &opensearch,
2370            &trigger,
2371            &analytics,
2372            &QueryContext::default()
2373                .with_correlation_id("corr-1")
2374                .with_request_id("req-1"),
2375        );
2376        assert!(report.passed(), "report={report:?}");
2377        assert!(report.checks.len() >= 8);
2378    }
2379
2380    #[test]
2381    fn adapter_error_paths_cover_invalid_inputs_and_failure_reporting() {
2382        let context = QueryContext::default();
2383        let empty_sql = SqlCommand::new("   ", Vec::new());
2384        let singlestore = InMemorySingleStoreAdapter::default();
2385        let clickhouse = InMemoryClickHouseAdapter::default();
2386        let bigquery = InMemoryBigQueryAdapter::default();
2387        let opensearch = InMemoryOpenSearchAdapter::default();
2388
2389        for err in [
2390            singlestore
2391                .run_query(empty_sql.clone(), &context)
2392                .unwrap_err(),
2393            clickhouse.run_query(empty_sql.clone(), &context).unwrap_err(),
2394            bigquery.run_query(empty_sql, &context).unwrap_err(),
2395        ] {
2396            assert_eq!(err.kind, IntegrationErrorKind::InvalidInput);
2397            assert_eq!(err.code.as_deref(), Some("empty_statement"));
2398        }
2399
2400        let search_err = opensearch
2401            .search(SearchRequest::new("   ", "needle"), &context)
2402            .unwrap_err();
2403        assert_eq!(search_err.kind, IntegrationErrorKind::InvalidInput);
2404        assert_eq!(search_err.code.as_deref(), Some("empty_index"));
2405    }
2406
2407    struct FailingSqlAdapter {
2408        source: &'static str,
2409    }
2410
2411    impl TypedQueryBoundary for FailingSqlAdapter {
2412        type Request = SqlCommand;
2413        type Response = Vec<crate::Row>;
2414
2415        fn execute(
2416            &self,
2417            _request: &Self::Request,
2418            _context: &QueryContext,
2419        ) -> super::IntegrationResult<Self::Response> {
2420            Err(IntegrationError::new(
2421                self.source,
2422                IntegrationErrorKind::Unavailable,
2423                "backend unavailable",
2424            )
2425            .with_code("backend_down"))
2426        }
2427    }
2428
2429    impl SingleStoreAdapter for FailingSqlAdapter {}
2430    impl ClickHouseAdapter for FailingSqlAdapter {}
2431    impl BigQueryAdapter for FailingSqlAdapter {}
2432
2433    struct FailingSearchAdapter;
2434
2435    impl TypedQueryBoundary for FailingSearchAdapter {
2436        type Request = SearchRequest;
2437        type Response = SearchResponse;
2438
2439        fn execute(
2440            &self,
2441            _request: &Self::Request,
2442            _context: &QueryContext,
2443        ) -> super::IntegrationResult<Self::Response> {
2444            Err(IntegrationError::new(
2445                "opensearch",
2446                IntegrationErrorKind::Unavailable,
2447                "search unavailable",
2448            ))
2449        }
2450    }
2451
2452    impl OpenSearchAdapter for FailingSearchAdapter {}
2453
2454    struct FailingTrigger;
2455
2456    impl TriggerDevAdapter for FailingTrigger {
2457        fn trigger_workflow(&self, _request: JobRequest) -> super::IntegrationResult<JobHandle> {
2458            Err(IntegrationError::new(
2459                "trigger",
2460                IntegrationErrorKind::Unavailable,
2461                "trigger unavailable",
2462            ))
2463        }
2464
2465        fn workflow_status(&self, _id: &str) -> super::IntegrationResult<JobStatus> {
2466            Err(IntegrationError::new(
2467                "trigger",
2468                IntegrationErrorKind::Unavailable,
2469                "trigger unavailable",
2470            ))
2471        }
2472
2473        fn poll_workflow(
2474            &self,
2475            _id: &str,
2476            _attempts: u32,
2477            _backoff_ms: u64,
2478        ) -> super::IntegrationResult<JobStatus> {
2479            Err(IntegrationError::new(
2480                "trigger",
2481                IntegrationErrorKind::Unavailable,
2482                "trigger unavailable",
2483            ))
2484        }
2485    }
2486
2487    #[derive(Default)]
2488    struct FailingAnalyticsSink;
2489
2490    impl AnalyticsSink for FailingAnalyticsSink {
2491        fn send_event(&self, _event: AnalyticsEvent) -> super::IntegrationResult<()> {
2492            Err(IntegrationError::new(
2493                "analytics",
2494                IntegrationErrorKind::Unavailable,
2495                "sink unavailable",
2496            ))
2497        }
2498    }
2499
2500    #[test]
2501    fn conformance_suite_captures_failures_for_unavailable_dependencies() {
2502        let failing_sql = FailingSqlAdapter { source: "sql" };
2503        let failing_search = FailingSearchAdapter;
2504        let failing_trigger = FailingTrigger;
2505        let failing_analytics = FailingAnalyticsSink;
2506
2507        let report = run_adapter_conformance_suite(
2508            &failing_sql,
2509            &failing_sql,
2510            &failing_sql,
2511            &failing_search,
2512            &failing_trigger,
2513            &failing_analytics,
2514            &QueryContext::default(),
2515        );
2516
2517        assert!(!report.passed());
2518        assert!(report.checks.iter().any(|check| {
2519            check.name == "singlestore.query" && !check.passed
2520        }));
2521        assert!(report.checks.iter().any(|check| {
2522            check.name == "opensearch.search" && !check.passed
2523        }));
2524        assert!(report.checks.iter().any(|check| {
2525            check.name == "trigger.enqueue" && !check.passed
2526        }));
2527        assert!(report.checks.iter().any(|check| {
2528            check.name == "analytics.emit" && !check.passed
2529        }));
2530        assert!(report.checks.iter().any(|check| {
2531            check.name == "context.correlation" && !check.passed
2532        }));
2533    }
2534}