Skip to main content

shelly_data/
integrations.rs

1use crate::{
2    error::{DataError, DataResult},
3    query::Query,
4    repo::{Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9    collections::{BTreeMap, HashMap},
10    fmt,
11    sync::{Arc, Mutex},
12    thread,
13    time::{Duration, Instant},
14};
15use tracing::{info, warn};
16
17pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
18pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
19pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
20pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
21pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
22pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum IntegrationErrorKind {
27    Transient,
28    Permanent,
29    Auth,
30    RateLimited,
31    Timeout,
32    Unavailable,
33    InvalidInput,
34}
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37pub struct IntegrationError {
38    pub source: String,
39    pub kind: IntegrationErrorKind,
40    pub message: String,
41    pub code: Option<String>,
42    pub retryable: bool,
43}
44
45impl IntegrationError {
46    pub fn new(
47        source: impl Into<String>,
48        kind: IntegrationErrorKind,
49        message: impl Into<String>,
50    ) -> Self {
51        let kind_value = kind;
52        Self {
53            source: source.into(),
54            kind: kind_value,
55            message: message.into(),
56            code: None,
57            retryable: matches!(
58                kind_value,
59                IntegrationErrorKind::Transient
60                    | IntegrationErrorKind::RateLimited
61                    | IntegrationErrorKind::Timeout
62                    | IntegrationErrorKind::Unavailable
63            ),
64        }
65    }
66
67    pub fn with_code(mut self, code: impl Into<String>) -> Self {
68        self.code = Some(code.into());
69        self
70    }
71}
72
73impl fmt::Display for IntegrationError {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        if let Some(code) = &self.code {
76            write!(f, "[{}:{}] {}", self.source, code, self.message)
77        } else {
78            write!(f, "[{}] {}", self.source, self.message)
79        }
80    }
81}
82
83impl std::error::Error for IntegrationError {}
84
85pub type IntegrationResult<T> = Result<T, IntegrationError>;
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88pub struct RetryPolicy {
89    pub max_attempts: u32,
90    pub initial_backoff_ms: u64,
91    pub max_backoff_ms: u64,
92}
93
94impl RetryPolicy {
95    pub fn conservative() -> Self {
96        Self {
97            max_attempts: 3,
98            initial_backoff_ms: 50,
99            max_backoff_ms: 500,
100        }
101    }
102
103    pub fn never() -> Self {
104        Self {
105            max_attempts: 1,
106            initial_backoff_ms: 0,
107            max_backoff_ms: 0,
108        }
109    }
110}
111
112impl Default for RetryPolicy {
113    fn default() -> Self {
114        Self::conservative()
115    }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
119pub struct AdapterCallContract {
120    pub retry_policy: RetryPolicy,
121    pub timeout_ms: u64,
122}
123
124impl AdapterCallContract {
125    pub fn low_latency() -> Self {
126        Self {
127            retry_policy: RetryPolicy::never(),
128            timeout_ms: 250,
129        }
130    }
131
132    pub fn default_query() -> Self {
133        Self {
134            retry_policy: RetryPolicy::conservative(),
135            timeout_ms: 1_500,
136        }
137    }
138
139    pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
140        self.retry_policy = retry_policy;
141        self
142    }
143
144    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
145        self.timeout_ms = timeout_ms.max(1);
146        self
147    }
148}
149
150impl Default for AdapterCallContract {
151    fn default() -> Self {
152        Self::default_query()
153    }
154}
155
156pub fn run_with_contract<T, F>(
157    source: &str,
158    operation: &str,
159    contract: AdapterCallContract,
160    context: &QueryContext,
161    mut operation_fn: F,
162) -> IntegrationResult<T>
163where
164    F: FnMut(u32) -> IntegrationResult<T>,
165{
166    let policy = context
167        .retry_policy_override()
168        .unwrap_or(contract.retry_policy);
169    let attempts = policy.max_attempts.max(1);
170    let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
171    let mut backoff_ms = policy.initial_backoff_ms;
172
173    for attempt in 1..=attempts {
174        let started_at = Instant::now();
175        let result = operation_fn(attempt);
176        let elapsed_ms = started_at.elapsed().as_millis() as u64;
177        if elapsed_ms > timeout_ms {
178            return Err(IntegrationError::new(
179                source,
180                IntegrationErrorKind::Timeout,
181                format!(
182                    "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
183                    elapsed_ms, timeout_ms
184                ),
185            )
186            .with_code("operation_timeout"));
187        }
188
189        match result {
190            Ok(value) => return Ok(value),
191            Err(err) if !err.retryable || attempt == attempts => return Err(err),
192            Err(err) => {
193                warn!(
194                    target: "shelly.integration.query",
195                    source,
196                    operation,
197                    attempt,
198                    max_attempts = attempts,
199                    tenant_id = ?context.tenant_id,
200                    trace_id = ?context.trace_id,
201                    correlation_id = context.correlation_id().unwrap_or("-"),
202                    request_id = context.request_id().unwrap_or("-"),
203                    timeout_ms,
204                    backoff_ms,
205                    error = %err,
206                    "Shelly integration retrying transient failure"
207                );
208                if backoff_ms > 0 {
209                    thread::sleep(Duration::from_millis(backoff_ms));
210                }
211                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
212            }
213        }
214    }
215
216    Err(IntegrationError::new(
217        source,
218        IntegrationErrorKind::Transient,
219        format!("{operation} retry exhausted"),
220    )
221    .with_code("retry_exhausted"))
222}
223
224pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
225where
226    F: FnMut(u32) -> IntegrationResult<T>,
227{
228    let attempts = policy.max_attempts.max(1);
229    let mut backoff_ms = policy.initial_backoff_ms;
230
231    for attempt in 1..=attempts {
232        match operation(attempt) {
233            Ok(value) => return Ok(value),
234            Err(err) if !err.retryable || attempt == attempts => return Err(err),
235            Err(_) => {
236                if backoff_ms > 0 {
237                    thread::sleep(Duration::from_millis(backoff_ms));
238                }
239                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
240            }
241        }
242    }
243
244    Err(IntegrationError::new(
245        "retry",
246        IntegrationErrorKind::Transient,
247        "retry exhausted",
248    ))
249}
250
251pub trait ConnectionLifecycleHook: Send + Sync {
252    fn on_connect(&self) -> IntegrationResult<()> {
253        Ok(())
254    }
255
256    fn on_disconnect(&self) -> IntegrationResult<()> {
257        Ok(())
258    }
259}
260
261pub trait ConnectionLifecycle {
262    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
263    fn connect(&self) -> IntegrationResult<()>;
264    fn disconnect(&self) -> IntegrationResult<()>;
265}
266
267#[derive(Default)]
268pub struct LifecycleHooks {
269    hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
270}
271
272impl LifecycleHooks {
273    pub fn new() -> Self {
274        Self::default()
275    }
276}
277
278impl ConnectionLifecycle for LifecycleHooks {
279    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
280        self.hooks.push(hook);
281    }
282
283    fn connect(&self) -> IntegrationResult<()> {
284        for hook in &self.hooks {
285            hook.on_connect()?;
286        }
287        Ok(())
288    }
289
290    fn disconnect(&self) -> IntegrationResult<()> {
291        for hook in &self.hooks {
292            hook.on_disconnect()?;
293        }
294        Ok(())
295    }
296}
297
298#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
299pub struct QueryContext {
300    pub tenant_id: Option<String>,
301    pub trace_id: Option<String>,
302    pub tags: BTreeMap<String, String>,
303}
304
305impl QueryContext {
306    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
307        self.tags.insert(key.into(), value.into());
308        self
309    }
310
311    pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
312        self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
313    }
314
315    pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
316        self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
317    }
318
319    pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
320        self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
321    }
322
323    pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
324        self.with_tag(
325            CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
326            policy.max_attempts.to_string(),
327        )
328        .with_tag(
329            CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
330            policy.initial_backoff_ms.to_string(),
331        )
332        .with_tag(
333            CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
334            policy.max_backoff_ms.to_string(),
335        )
336    }
337
338    pub fn correlation_id(&self) -> Option<&str> {
339        self.tags
340            .get(CONTEXT_TAG_CORRELATION_ID)
341            .map(String::as_str)
342            .map(str::trim)
343            .filter(|value| !value.is_empty())
344    }
345
346    pub fn request_id(&self) -> Option<&str> {
347        self.tags
348            .get(CONTEXT_TAG_REQUEST_ID)
349            .map(String::as_str)
350            .map(str::trim)
351            .filter(|value| !value.is_empty())
352    }
353
354    pub fn timeout_ms(&self) -> Option<u64> {
355        self.tags
356            .get(CONTEXT_TAG_TIMEOUT_MS)
357            .and_then(|value| value.parse::<u64>().ok())
358            .filter(|value| *value > 0)
359    }
360
361    pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
362        let max_attempts = self
363            .tags
364            .get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
365            .and_then(|value| value.parse::<u32>().ok())?;
366        let initial_backoff_ms = self
367            .tags
368            .get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
369            .and_then(|value| value.parse::<u64>().ok())
370            .unwrap_or(0);
371        let max_backoff_ms = self
372            .tags
373            .get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
374            .and_then(|value| value.parse::<u64>().ok())
375            .unwrap_or(initial_backoff_ms);
376        Some(RetryPolicy {
377            max_attempts: max_attempts.max(1),
378            initial_backoff_ms,
379            max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
380        })
381    }
382}
383
384pub trait TypedQueryBoundary {
385    type Request: Send + Sync;
386    type Response: Send + Sync;
387
388    fn execute(
389        &self,
390        request: &Self::Request,
391        context: &QueryContext,
392    ) -> IntegrationResult<Self::Response>;
393}
394
395#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
396pub struct SqlCommand {
397    pub statement: String,
398    pub params: Vec<Value>,
399}
400
401impl SqlCommand {
402    pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
403        Self {
404            statement: statement.into(),
405            params,
406        }
407    }
408}
409
410pub trait SingleStoreAdapter:
411    TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
412{
413    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
414        let started_at = Instant::now();
415        let statement = query.statement.as_str();
416        let param_count = query.params.len();
417        let result = self.execute(&query, context);
418        match &result {
419            Ok(rows) => info!(
420                target: "shelly.integration.query",
421                source = "singlestore",
422                operation = "run_query",
423                tenant_id = ?context.tenant_id,
424                trace_id = ?context.trace_id,
425                tag_count = context.tags.len(),
426                statement,
427                param_count,
428                row_count = rows.len(),
429                duration_ms = started_at.elapsed().as_millis() as u64,
430                "Shelly integration query executed"
431            ),
432            Err(err) => warn!(
433                target: "shelly.integration.query",
434                source = "singlestore",
435                operation = "run_query",
436                tenant_id = ?context.tenant_id,
437                trace_id = ?context.trace_id,
438                tag_count = context.tags.len(),
439                statement,
440                param_count,
441                duration_ms = started_at.elapsed().as_millis() as u64,
442                error = %err,
443                "Shelly integration query failed"
444            ),
445        }
446        result
447    }
448}
449
450#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
451pub struct SearchRequest {
452    pub index: String,
453    pub text: String,
454    pub filters: Vec<crate::query::Filter>,
455    pub page: usize,
456    pub per_page: usize,
457}
458
459impl SearchRequest {
460    pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
461        Self {
462            index: index.into(),
463            text: text.into(),
464            filters: Vec::new(),
465            page: 1,
466            per_page: 25,
467        }
468    }
469
470    pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
471        self.filters.push(filter);
472        self
473    }
474}
475
476#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
477pub struct SearchResponse {
478    pub total_hits: usize,
479    pub rows: Vec<StoredRow>,
480}
481
482pub trait OpenSearchAdapter:
483    TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
484{
485    fn search(
486        &self,
487        request: SearchRequest,
488        context: &QueryContext,
489    ) -> IntegrationResult<SearchResponse> {
490        let started_at = Instant::now();
491        let index = request.index.as_str();
492        let text = request.text.as_str();
493        let filter_count = request.filters.len();
494        let page = request.page;
495        let per_page = request.per_page;
496        let result = self.execute(&request, context);
497        match &result {
498            Ok(response) => info!(
499                target: "shelly.integration.query",
500                source = "opensearch",
501                operation = "search",
502                tenant_id = ?context.tenant_id,
503                trace_id = ?context.trace_id,
504                tag_count = context.tags.len(),
505                index,
506                text,
507                filter_count,
508                page,
509                per_page,
510                row_count = response.rows.len(),
511                total_hits = response.total_hits,
512                duration_ms = started_at.elapsed().as_millis() as u64,
513                "Shelly integration query executed"
514            ),
515            Err(err) => warn!(
516                target: "shelly.integration.query",
517                source = "opensearch",
518                operation = "search",
519                tenant_id = ?context.tenant_id,
520                trace_id = ?context.trace_id,
521                tag_count = context.tags.len(),
522                index,
523                text,
524                filter_count,
525                page,
526                per_page,
527                duration_ms = started_at.elapsed().as_millis() as u64,
528                error = %err,
529                "Shelly integration query failed"
530            ),
531        }
532        result
533    }
534}
535
536pub trait AnalyticsSink: Send + Sync {
537    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
538}
539
540#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
541pub struct AnalyticsEvent {
542    pub namespace: String,
543    pub name: String,
544    pub payload: Value,
545}
546
547impl AnalyticsEvent {
548    pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
549        Self {
550            namespace: namespace.into(),
551            name: name.into(),
552            payload,
553        }
554    }
555}
556
557#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
558pub struct JobRequest {
559    pub workflow: String,
560    pub payload: Value,
561    pub idempotency_key: String,
562    pub metadata: BTreeMap<String, String>,
563}
564
565impl JobRequest {
566    pub fn new(
567        workflow: impl Into<String>,
568        payload: Value,
569        idempotency_key: impl Into<String>,
570    ) -> Self {
571        Self {
572            workflow: workflow.into(),
573            payload,
574            idempotency_key: idempotency_key.into(),
575            metadata: BTreeMap::new(),
576        }
577    }
578}
579
580#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
581#[serde(rename_all = "snake_case")]
582pub enum JobState {
583    Queued,
584    Running,
585    Succeeded,
586    Failed,
587    Canceled,
588}
589
590#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
591pub struct JobStatus {
592    pub id: String,
593    pub state: JobState,
594    pub attempts: u32,
595    pub result: Option<Value>,
596    pub error: Option<IntegrationError>,
597}
598
599#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
600pub struct JobHandle {
601    pub id: String,
602    pub workflow: String,
603    pub idempotency_key: String,
604}
605
606pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
607
608pub trait JobOrchestrator: Send + Sync {
609    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
610    fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
611    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
612    fn register_completion_callback(&self, callback: JobCompletionCallback);
613}
614
615pub trait TriggerDevAdapter: Send + Sync {
616    fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
617    fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
618    fn poll_workflow(
619        &self,
620        id: &str,
621        attempts: u32,
622        backoff_ms: u64,
623    ) -> IntegrationResult<JobStatus>;
624}
625
626impl<T> TriggerDevAdapter for T
627where
628    T: JobOrchestrator + Send + Sync,
629{
630    fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
631        self.enqueue(request)
632    }
633
634    fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
635        self.status(id)
636    }
637
638    fn poll_workflow(
639        &self,
640        id: &str,
641        attempts: u32,
642        backoff_ms: u64,
643    ) -> IntegrationResult<JobStatus> {
644        self.poll(id, attempts, backoff_ms)
645    }
646}
647
648#[derive(Clone)]
649pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
650    sink: Arc<S>,
651    namespace: String,
652}
653
654impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
655    pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
656        Self {
657            sink,
658            namespace: namespace.into(),
659        }
660    }
661
662    pub fn emit(
663        &self,
664        name: impl Into<String>,
665        payload: Value,
666        context: &QueryContext,
667    ) -> IntegrationResult<()> {
668        let mut envelope = serde_json::Map::new();
669        envelope.insert("payload".to_string(), payload);
670        envelope.insert(
671            "tenant_id".to_string(),
672            context
673                .tenant_id
674                .as_ref()
675                .map(|value| Value::String(value.clone()))
676                .unwrap_or(Value::Null),
677        );
678        envelope.insert(
679            "trace_id".to_string(),
680            context
681                .trace_id
682                .as_ref()
683                .map(|value| Value::String(value.clone()))
684                .unwrap_or(Value::Null),
685        );
686        envelope.insert(
687            "correlation_id".to_string(),
688            context
689                .correlation_id()
690                .map(|value| Value::String(value.to_string()))
691                .unwrap_or(Value::Null),
692        );
693        envelope.insert(
694            "request_id".to_string(),
695            context
696                .request_id()
697                .map(|value| Value::String(value.to_string()))
698                .unwrap_or(Value::Null),
699        );
700        envelope.insert("tags".to_string(), serde_json::json!(context.tags));
701
702        self.sink.send_event(AnalyticsEvent::new(
703            self.namespace.clone(),
704            name.into(),
705            Value::Object(envelope),
706        ))
707    }
708}
709
710#[derive(Debug, Clone, Default)]
711pub struct InMemorySingleStoreAdapter {
712    rows: Vec<Row>,
713}
714
715impl InMemorySingleStoreAdapter {
716    pub fn new(rows: Vec<Row>) -> Self {
717        Self { rows }
718    }
719}
720
721impl TypedQueryBoundary for InMemorySingleStoreAdapter {
722    type Request = SqlCommand;
723    type Response = Vec<Row>;
724
725    fn execute(
726        &self,
727        request: &Self::Request,
728        _context: &QueryContext,
729    ) -> IntegrationResult<Self::Response> {
730        if request.statement.trim().is_empty() {
731            return Err(IntegrationError::new(
732                "singlestore",
733                IntegrationErrorKind::InvalidInput,
734                "empty SQL statement",
735            )
736            .with_code("empty_statement"));
737        }
738        Ok(self.rows.clone())
739    }
740}
741
742impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
743
744#[derive(Debug, Clone, Default)]
745pub struct InMemoryOpenSearchAdapter {
746    rows: Vec<StoredRow>,
747}
748
749impl InMemoryOpenSearchAdapter {
750    pub fn new(rows: Vec<StoredRow>) -> Self {
751        Self { rows }
752    }
753}
754
755impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
756    type Request = SearchRequest;
757    type Response = SearchResponse;
758
759    fn execute(
760        &self,
761        request: &Self::Request,
762        _context: &QueryContext,
763    ) -> IntegrationResult<Self::Response> {
764        if request.index.trim().is_empty() {
765            return Err(IntegrationError::new(
766                "opensearch",
767                IntegrationErrorKind::InvalidInput,
768                "search index must not be empty",
769            )
770            .with_code("empty_index"));
771        }
772        let needle = request.text.trim().to_lowercase();
773        let mut filtered = self
774            .rows
775            .iter()
776            .filter(|row| {
777                if needle.is_empty() {
778                    return true;
779                }
780                row.data.values().any(|value| {
781                    value
782                        .as_str()
783                        .map(|text| text.to_lowercase().contains(&needle))
784                        .unwrap_or(false)
785                })
786            })
787            .cloned()
788            .collect::<Vec<_>>();
789        let total_hits = filtered.len();
790        let page = request.page.max(1);
791        let per_page = request.per_page.max(1);
792        let offset = (page - 1) * per_page;
793        filtered = filtered.into_iter().skip(offset).take(per_page).collect();
794        Ok(SearchResponse {
795            total_hits,
796            rows: filtered,
797        })
798    }
799}
800
801impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
802
803#[derive(Debug, Default, Clone)]
804pub struct InMemoryAxiomSink {
805    events: Arc<Mutex<Vec<AnalyticsEvent>>>,
806}
807
808impl InMemoryAxiomSink {
809    pub fn events(&self) -> Vec<AnalyticsEvent> {
810        self.events
811            .lock()
812            .map(|events| events.clone())
813            .unwrap_or_default()
814    }
815}
816
817impl AnalyticsSink for InMemoryAxiomSink {
818    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
819        let started_at = Instant::now();
820        let namespace = event.namespace.clone();
821        let name = event.name.clone();
822        self.events
823            .lock()
824            .map_err(|_| {
825                IntegrationError::new(
826                    "axiom",
827                    IntegrationErrorKind::Unavailable,
828                    "analytics sink lock poisoned",
829                )
830            })?
831            .push(event);
832        info!(
833            target: "shelly.integration.query",
834            source = "axiom",
835            operation = "send_event",
836            namespace,
837            event_name = name,
838            duration_ms = started_at.elapsed().as_millis() as u64,
839            "Shelly integration call executed"
840        );
841        Ok(())
842    }
843}
844
845#[derive(Default)]
846pub struct InMemoryJobOrchestrator {
847    statuses: Mutex<HashMap<String, JobStatus>>,
848    callbacks: Mutex<Vec<JobCompletionCallback>>,
849    next_id: Mutex<u64>,
850}
851
852impl InMemoryJobOrchestrator {
853    pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
854        let status = self.with_status_mut(id, |status| {
855            status.state = JobState::Succeeded;
856            status.result = Some(result);
857            status.error = None;
858        })?;
859        self.notify(&status);
860        Ok(())
861    }
862
863    pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
864        let status = self.with_status_mut(id, |status| {
865            status.state = JobState::Failed;
866            status.result = None;
867            status.error = Some(error);
868        })?;
869        self.notify(&status);
870        Ok(())
871    }
872
873    fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
874    where
875        F: FnOnce(&mut JobStatus),
876    {
877        let mut statuses = self.statuses.lock().map_err(|_| {
878            IntegrationError::new(
879                "trigger",
880                IntegrationErrorKind::Unavailable,
881                "job status lock poisoned",
882            )
883        })?;
884        let Some(status) = statuses.get_mut(id) else {
885            return Err(IntegrationError::new(
886                "trigger",
887                IntegrationErrorKind::InvalidInput,
888                "job not found",
889            )
890            .with_code("job_not_found"));
891        };
892        status.attempts = status.attempts.saturating_add(1);
893        update(status);
894        Ok(status.clone())
895    }
896
897    fn notify(&self, status: &JobStatus) {
898        if let Ok(callbacks) = self.callbacks.lock() {
899            for callback in callbacks.iter() {
900                callback(status);
901            }
902        }
903    }
904
905    fn is_terminal(state: JobState) -> bool {
906        matches!(
907            state,
908            JobState::Succeeded | JobState::Failed | JobState::Canceled
909        )
910    }
911}
912
913impl JobOrchestrator for InMemoryJobOrchestrator {
914    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
915        let started_at = Instant::now();
916        let workflow = request.workflow.clone();
917        let idempotency_key = request.idempotency_key.clone();
918        let mut next_id = self.next_id.lock().map_err(|_| {
919            IntegrationError::new(
920                "trigger",
921                IntegrationErrorKind::Unavailable,
922                "job id lock poisoned",
923            )
924        })?;
925        *next_id = next_id.saturating_add(1);
926        let id = format!("job-{next_id}");
927        let status = JobStatus {
928            id: id.clone(),
929            state: JobState::Queued,
930            attempts: 0,
931            result: None,
932            error: None,
933        };
934        self.statuses
935            .lock()
936            .map_err(|_| {
937                IntegrationError::new(
938                    "trigger",
939                    IntegrationErrorKind::Unavailable,
940                    "job status lock poisoned",
941                )
942            })?
943            .insert(id.clone(), status);
944        let handle = JobHandle {
945            id,
946            workflow: request.workflow,
947            idempotency_key: request.idempotency_key,
948        };
949        info!(
950            target: "shelly.integration.query",
951            source = "trigger",
952            operation = "enqueue",
953            workflow,
954            idempotency_key,
955            job_id = handle.id.as_str(),
956            duration_ms = started_at.elapsed().as_millis() as u64,
957            "Shelly integration call executed"
958        );
959        Ok(handle)
960    }
961
962    fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
963        let started_at = Instant::now();
964        let result = self
965            .statuses
966            .lock()
967            .map_err(|_| {
968                IntegrationError::new(
969                    "trigger",
970                    IntegrationErrorKind::Unavailable,
971                    "job status lock poisoned",
972                )
973            })?
974            .get(id)
975            .cloned()
976            .ok_or_else(|| {
977                IntegrationError::new(
978                    "trigger",
979                    IntegrationErrorKind::InvalidInput,
980                    "job not found",
981                )
982                .with_code("job_not_found")
983            });
984        match &result {
985            Ok(status) => info!(
986                target: "shelly.integration.query",
987                source = "trigger",
988                operation = "status",
989                job_id = id,
990                job_state = ?status.state,
991                attempts = status.attempts,
992                duration_ms = started_at.elapsed().as_millis() as u64,
993                "Shelly integration call executed"
994            ),
995            Err(err) => warn!(
996                target: "shelly.integration.query",
997                source = "trigger",
998                operation = "status",
999                job_id = id,
1000                duration_ms = started_at.elapsed().as_millis() as u64,
1001                error = %err,
1002                "Shelly integration call failed"
1003            ),
1004        }
1005        result
1006    }
1007
1008    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
1009        let started_at = Instant::now();
1010        let attempts = attempts.max(1);
1011        let mut polled = 0u32;
1012        for current in 1..=attempts {
1013            let status = self.status(id)?;
1014            polled = current;
1015            if Self::is_terminal(status.state) || current == attempts {
1016                info!(
1017                    target: "shelly.integration.query",
1018                    source = "trigger",
1019                    operation = "poll",
1020                    job_id = id,
1021                    attempts = polled,
1022                    backoff_ms,
1023                    terminal = Self::is_terminal(status.state),
1024                    job_state = ?status.state,
1025                    duration_ms = started_at.elapsed().as_millis() as u64,
1026                    "Shelly integration call executed"
1027                );
1028                return Ok(status);
1029            }
1030            if backoff_ms > 0 {
1031                thread::sleep(Duration::from_millis(backoff_ms));
1032            }
1033        }
1034
1035        let result = self.status(id);
1036        match &result {
1037            Ok(status) => info!(
1038                target: "shelly.integration.query",
1039                source = "trigger",
1040                operation = "poll",
1041                job_id = id,
1042                attempts = polled,
1043                backoff_ms,
1044                terminal = Self::is_terminal(status.state),
1045                job_state = ?status.state,
1046                duration_ms = started_at.elapsed().as_millis() as u64,
1047                "Shelly integration call executed"
1048            ),
1049            Err(err) => warn!(
1050                target: "shelly.integration.query",
1051                source = "trigger",
1052                operation = "poll",
1053                job_id = id,
1054                attempts = polled,
1055                backoff_ms,
1056                duration_ms = started_at.elapsed().as_millis() as u64,
1057                error = %err,
1058                "Shelly integration call failed"
1059            ),
1060        }
1061        result
1062    }
1063
1064    fn register_completion_callback(&self, callback: JobCompletionCallback) {
1065        if let Ok(mut callbacks) = self.callbacks.lock() {
1066            callbacks.push(callback);
1067            info!(
1068                target: "shelly.integration.query",
1069                source = "trigger",
1070                operation = "register_completion_callback",
1071                callback_count = callbacks.len(),
1072                "Shelly integration callback registered"
1073            );
1074        } else {
1075            warn!(
1076                target: "shelly.integration.query",
1077                source = "trigger",
1078                operation = "register_completion_callback",
1079                "Shelly integration callback registration failed"
1080            );
1081        }
1082    }
1083}
1084
1085#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1086pub struct AdapterConformanceCheck {
1087    pub name: String,
1088    pub passed: bool,
1089    pub detail: String,
1090}
1091
1092#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
1093pub struct AdapterConformanceReport {
1094    pub checks: Vec<AdapterConformanceCheck>,
1095}
1096
1097impl AdapterConformanceReport {
1098    pub fn passed(&self) -> bool {
1099        self.checks.iter().all(|check| check.passed)
1100    }
1101
1102    pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1103        self.checks.push(AdapterConformanceCheck {
1104            name: name.into(),
1105            passed: true,
1106            detail: detail.into(),
1107        });
1108    }
1109
1110    pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1111        self.checks.push(AdapterConformanceCheck {
1112            name: name.into(),
1113            passed: false,
1114            detail: detail.into(),
1115        });
1116    }
1117}
1118
1119pub fn run_adapter_conformance_suite(
1120    singlestore: &dyn SingleStoreAdapter,
1121    opensearch: &dyn OpenSearchAdapter,
1122    trigger: &dyn TriggerDevAdapter,
1123    analytics: &dyn AnalyticsSink,
1124    context: &QueryContext,
1125) -> AdapterConformanceReport {
1126    let mut report = AdapterConformanceReport::default();
1127
1128    match singlestore.run_query(
1129        SqlCommand::new(
1130            "SELECT tenant, active_accounts FROM tenant_rollup",
1131            Vec::new(),
1132        ),
1133        context,
1134    ) {
1135        Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
1136        Err(err) => report.add_fail("singlestore.query", err.to_string()),
1137    }
1138
1139    match opensearch.search(SearchRequest::new("accounts", ""), context) {
1140        Ok(response) => report.add_pass(
1141            "opensearch.search",
1142            format!(
1143                "rows={} total_hits={}",
1144                response.rows.len(),
1145                response.total_hits
1146            ),
1147        ),
1148        Err(err) => report.add_fail("opensearch.search", err.to_string()),
1149    }
1150
1151    match trigger.trigger_workflow(JobRequest::new(
1152        "adapter_conformance_probe",
1153        serde_json::json!({"probe": true}),
1154        "adapter-conformance-probe",
1155    )) {
1156        Ok(handle) => {
1157            report.add_pass("trigger.enqueue", format!("id={}", handle.id));
1158            match trigger.workflow_status(&handle.id) {
1159                Ok(status) => report.add_pass(
1160                    "trigger.status",
1161                    format!("state={:?} attempts={}", status.state, status.attempts),
1162                ),
1163                Err(err) => report.add_fail("trigger.status", err.to_string()),
1164            }
1165        }
1166        Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
1167    }
1168
1169    let analytics_event = AnalyticsEvent::new(
1170        "adapter_conformance",
1171        "probe",
1172        serde_json::json!({
1173            "trace_id": context.trace_id,
1174            "correlation_id": context.correlation_id(),
1175            "request_id": context.request_id(),
1176        }),
1177    );
1178    match analytics.send_event(analytics_event) {
1179        Ok(()) => report.add_pass("analytics.emit", "event accepted"),
1180        Err(err) => report.add_fail("analytics.emit", err.to_string()),
1181    }
1182
1183    report
1184}
1185
1186pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
1187    DataError::Integration(format!("[{}] {}", source.into(), err))
1188}
1189
1190pub fn map_integration_result<T>(
1191    source: impl Into<String>,
1192    result: IntegrationResult<T>,
1193) -> DataResult<T> {
1194    result.map_err(|err| map_integration_error(source, err))
1195}
1196
1197pub fn query_from_search(request: &SearchRequest) -> Query {
1198    let mut query = Query::new().paginate(request.page, request.per_page);
1199    for filter in &request.filters {
1200        query = query.where_filter(filter.clone());
1201    }
1202    query
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use super::{
1208        map_integration_result, run_adapter_conformance_suite, run_with_contract, run_with_retry,
1209        AdapterCallContract, AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge,
1210        ConnectionLifecycle, ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
1211        InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
1212        IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
1213        LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
1214        SingleStoreAdapter, SqlCommand, CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID,
1215        CONTEXT_TAG_TIMEOUT_MS,
1216    };
1217    use serde_json::{json, Value};
1218    use std::sync::{Arc, Mutex};
1219
1220    struct CountingHook {
1221        connects: Arc<Mutex<u32>>,
1222        disconnects: Arc<Mutex<u32>>,
1223    }
1224
1225    impl ConnectionLifecycleHook for CountingHook {
1226        fn on_connect(&self) -> super::IntegrationResult<()> {
1227            let mut guard = self.connects.lock().unwrap();
1228            *guard += 1;
1229            Ok(())
1230        }
1231
1232        fn on_disconnect(&self) -> super::IntegrationResult<()> {
1233            let mut guard = self.disconnects.lock().unwrap();
1234            *guard += 1;
1235            Ok(())
1236        }
1237    }
1238
1239    #[test]
1240    fn lifecycle_hooks_are_called() {
1241        let connects = Arc::new(Mutex::new(0));
1242        let disconnects = Arc::new(Mutex::new(0));
1243        let mut lifecycle = LifecycleHooks::new();
1244        lifecycle.register_hook(Arc::new(CountingHook {
1245            connects: connects.clone(),
1246            disconnects: disconnects.clone(),
1247        }));
1248
1249        lifecycle.connect().unwrap();
1250        lifecycle.disconnect().unwrap();
1251        assert_eq!(*connects.lock().unwrap(), 1);
1252        assert_eq!(*disconnects.lock().unwrap(), 1);
1253    }
1254
1255    #[test]
1256    fn retry_policy_retries_transient_errors() {
1257        let mut calls = 0u32;
1258        let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
1259            calls = attempt;
1260            if attempt < 3 {
1261                Err(IntegrationError::new(
1262                    "opensearch",
1263                    IntegrationErrorKind::Transient,
1264                    "temporary failure",
1265                ))
1266            } else {
1267                Ok("ok")
1268            }
1269        })
1270        .unwrap();
1271
1272        assert_eq!(result, "ok");
1273        assert_eq!(calls, 3);
1274    }
1275
1276    #[test]
1277    fn retry_policy_stops_on_permanent_errors() {
1278        let mut calls = 0u32;
1279        let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
1280            calls = attempt;
1281            Err::<(), IntegrationError>(IntegrationError::new(
1282                "singlestore",
1283                IntegrationErrorKind::Permanent,
1284                "invalid sql",
1285            ))
1286        })
1287        .unwrap_err();
1288
1289        assert_eq!(calls, 1);
1290        assert_eq!(err.kind, IntegrationErrorKind::Permanent);
1291    }
1292
1293    #[test]
1294    fn integration_result_maps_into_data_error() {
1295        let mapped = map_integration_result::<()>(
1296            "trigger",
1297            Err(IntegrationError::new(
1298                "trigger",
1299                IntegrationErrorKind::Unavailable,
1300                "service unavailable",
1301            )),
1302        )
1303        .unwrap_err();
1304
1305        assert!(mapped.to_string().contains("service unavailable"));
1306    }
1307
1308    #[derive(Default)]
1309    struct InMemoryJobs {
1310        statuses: Arc<Mutex<Vec<JobStatus>>>,
1311        callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
1312    }
1313
1314    impl JobOrchestrator for InMemoryJobs {
1315        fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
1316            let id = format!("job-{}", request.idempotency_key);
1317            self.statuses.lock().unwrap().push(JobStatus {
1318                id: id.clone(),
1319                state: JobState::Queued,
1320                attempts: 0,
1321                result: None,
1322                error: None,
1323            });
1324            Ok(JobHandle {
1325                id,
1326                workflow: request.workflow,
1327                idempotency_key: request.idempotency_key,
1328            })
1329        }
1330
1331        fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
1332            self.statuses
1333                .lock()
1334                .unwrap()
1335                .iter()
1336                .find(|status| status.id == id)
1337                .cloned()
1338                .ok_or_else(|| {
1339                    IntegrationError::new(
1340                        "trigger",
1341                        IntegrationErrorKind::InvalidInput,
1342                        "job not found",
1343                    )
1344                })
1345        }
1346
1347        fn poll(
1348            &self,
1349            id: &str,
1350            _attempts: u32,
1351            _backoff_ms: u64,
1352        ) -> super::IntegrationResult<JobStatus> {
1353            self.status(id)
1354        }
1355
1356        fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
1357            self.callbacks.lock().unwrap().push(callback);
1358        }
1359    }
1360
1361    #[test]
1362    fn job_orchestration_contract_supports_enqueue_and_status() {
1363        let jobs = InMemoryJobs::default();
1364        let handle = jobs
1365            .enqueue(JobRequest::new(
1366                "sync_customer",
1367                json!({"id": 42}),
1368                "idempotent-42",
1369            ))
1370            .unwrap();
1371        let status = jobs.status(&handle.id).unwrap();
1372        assert_eq!(status.state, JobState::Queued);
1373        assert_eq!(handle.idempotency_key, "idempotent-42");
1374
1375        let ctx = QueryContext {
1376            tenant_id: Some("tenant-a".to_string()),
1377            ..QueryContext::default()
1378        };
1379        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
1380    }
1381
1382    #[test]
1383    fn reference_singlestore_adapter_runs_typed_sql_boundary() {
1384        let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
1385            "region".to_string(),
1386            Value::String("EMEA".to_string()),
1387        )])]);
1388        let rows = adapter
1389            .run_query(
1390                SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1391                &QueryContext::default(),
1392            )
1393            .unwrap();
1394        assert_eq!(rows.len(), 1);
1395        assert_eq!(
1396            rows[0].get("region"),
1397            Some(&Value::String("EMEA".to_string()))
1398        );
1399    }
1400
1401    #[test]
1402    fn reference_opensearch_adapter_filters_rows() {
1403        let rows = vec![
1404            crate::StoredRow {
1405                id: 1,
1406                data: std::collections::BTreeMap::from([(
1407                    "title".to_string(),
1408                    Value::String("Acme renewal".to_string()),
1409                )]),
1410            },
1411            crate::StoredRow {
1412                id: 2,
1413                data: std::collections::BTreeMap::from([(
1414                    "title".to_string(),
1415                    Value::String("Globex onboarding".to_string()),
1416                )]),
1417            },
1418        ];
1419        let adapter = InMemoryOpenSearchAdapter::new(rows);
1420        let response = adapter
1421            .search(
1422                SearchRequest::new("accounts", "renewal"),
1423                &QueryContext::default(),
1424            )
1425            .unwrap();
1426        assert_eq!(response.total_hits, 1);
1427        assert_eq!(response.rows[0].id, 1);
1428    }
1429
1430    #[test]
1431    fn reference_axiom_sink_records_events() {
1432        let sink = InMemoryAxiomSink::default();
1433        sink.send_event(AnalyticsEvent::new(
1434            "sales",
1435            "query_executed",
1436            json!({"latency_ms": 12}),
1437        ))
1438        .unwrap();
1439        let events = sink.events();
1440        assert_eq!(events.len(), 1);
1441        assert_eq!(events[0].name, "query_executed");
1442    }
1443
1444    #[test]
1445    fn reference_trigger_orchestrator_supports_completion_and_polling() {
1446        let orchestrator = InMemoryJobOrchestrator::default();
1447        let completed = Arc::new(Mutex::new(false));
1448        let completed_flag = completed.clone();
1449        orchestrator.register_completion_callback(Arc::new(move |status| {
1450            if status.state == JobState::Succeeded {
1451                if let Ok(mut guard) = completed_flag.lock() {
1452                    *guard = true;
1453                }
1454            }
1455        }));
1456
1457        let handle = orchestrator
1458            .enqueue(JobRequest::new(
1459                "refresh_dashboard",
1460                json!({"account_id": 7}),
1461                "refresh-7",
1462            ))
1463            .unwrap();
1464        orchestrator
1465            .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
1466            .unwrap();
1467        let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
1468        assert_eq!(status.state, JobState::Succeeded);
1469        assert_eq!(
1470            status.result,
1471            Some(json!({
1472                "rows_synced": 18
1473            }))
1474        );
1475        assert!(*completed.lock().unwrap());
1476    }
1477
1478    #[test]
1479    fn query_context_helpers_encode_trace_correlation_retry_timeout() {
1480        let context = QueryContext::default()
1481            .with_correlation_id("corr-42")
1482            .with_request_id("req-42")
1483            .with_timeout_ms(900)
1484            .with_retry_policy(RetryPolicy {
1485                max_attempts: 4,
1486                initial_backoff_ms: 25,
1487                max_backoff_ms: 100,
1488            });
1489
1490        assert_eq!(
1491            context
1492                .tags
1493                .get(CONTEXT_TAG_CORRELATION_ID)
1494                .map(String::as_str),
1495            Some("corr-42")
1496        );
1497        assert_eq!(
1498            context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
1499            Some("req-42")
1500        );
1501        assert_eq!(
1502            context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
1503            Some("900")
1504        );
1505        assert_eq!(context.correlation_id(), Some("corr-42"));
1506        assert_eq!(context.request_id(), Some("req-42"));
1507        assert_eq!(context.timeout_ms(), Some(900));
1508        assert_eq!(
1509            context.retry_policy_override(),
1510            Some(RetryPolicy {
1511                max_attempts: 4,
1512                initial_backoff_ms: 25,
1513                max_backoff_ms: 100
1514            })
1515        );
1516    }
1517
1518    #[test]
1519    fn run_with_contract_respects_context_retry_override() {
1520        let context = QueryContext::default().with_retry_policy(RetryPolicy {
1521            max_attempts: 2,
1522            initial_backoff_ms: 0,
1523            max_backoff_ms: 0,
1524        });
1525        let mut attempts = 0u32;
1526        let result = run_with_contract(
1527            "opensearch",
1528            "search",
1529            AdapterCallContract::default()
1530                .with_retry_policy(RetryPolicy::never())
1531                .with_timeout_ms(1_000),
1532            &context,
1533            |attempt| {
1534                attempts = attempt;
1535                if attempt == 1 {
1536                    Err(IntegrationError::new(
1537                        "opensearch",
1538                        IntegrationErrorKind::Transient,
1539                        "temporary failure",
1540                    ))
1541                } else {
1542                    Ok("ok")
1543                }
1544            },
1545        )
1546        .unwrap();
1547        assert_eq!(result, "ok");
1548        assert_eq!(attempts, 2);
1549    }
1550
1551    #[test]
1552    fn run_with_contract_returns_timeout_error() {
1553        let context = QueryContext::default().with_timeout_ms(5);
1554        let err = run_with_contract(
1555            "singlestore",
1556            "run_query",
1557            AdapterCallContract::default(),
1558            &context,
1559            |_| {
1560                std::thread::sleep(std::time::Duration::from_millis(15));
1561                Ok::<_, IntegrationError>("done")
1562            },
1563        )
1564        .unwrap_err();
1565        assert_eq!(err.kind, IntegrationErrorKind::Timeout);
1566        assert_eq!(err.code.as_deref(), Some("operation_timeout"));
1567    }
1568
1569    #[test]
1570    fn axiom_bridge_enriches_events_with_trace_and_correlation() {
1571        let sink = Arc::new(InMemoryAxiomSink::default());
1572        let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
1573        let context = QueryContext {
1574            tenant_id: Some("tenant-a".to_string()),
1575            trace_id: Some("trace-123".to_string()),
1576            tags: std::collections::BTreeMap::new(),
1577        }
1578        .with_correlation_id("corr-9")
1579        .with_request_id("req-9");
1580        bridge
1581            .emit("session_event", json!({"event":"patch"}), &context)
1582            .unwrap();
1583        let events = sink.events();
1584        assert_eq!(events.len(), 1);
1585        let payload = events[0].payload.as_object().unwrap();
1586        assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
1587        assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
1588        assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
1589    }
1590
1591    #[test]
1592    fn conformance_suite_passes_with_reference_adapters() {
1593        let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
1594            [("tenant".to_string(), Value::String("north".to_string()))],
1595        )]);
1596        let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
1597            id: 1,
1598            data: std::collections::BTreeMap::from([(
1599                "title".to_string(),
1600                Value::String("Acme renewal".to_string()),
1601            )]),
1602        }]);
1603        let trigger = InMemoryJobOrchestrator::default();
1604        let analytics = InMemoryAxiomSink::default();
1605        let report = run_adapter_conformance_suite(
1606            &singlestore,
1607            &opensearch,
1608            &trigger,
1609            &analytics,
1610            &QueryContext::default(),
1611        );
1612        assert!(report.passed(), "report={report:?}");
1613        assert!(report.checks.len() >= 4);
1614    }
1615}