1use crate::{
2 error::{DataError, DataResult},
3 query::Query,
4 repo::{Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt,
11 sync::{Arc, Mutex},
12 thread,
13 time::{Duration, Instant},
14};
15use tracing::{info, warn};
16
17pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
18pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
19pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
20pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
21pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
22pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum IntegrationErrorKind {
27 Transient,
28 Permanent,
29 Auth,
30 RateLimited,
31 Timeout,
32 Unavailable,
33 InvalidInput,
34}
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37pub struct IntegrationError {
38 pub source: String,
39 pub kind: IntegrationErrorKind,
40 pub message: String,
41 pub code: Option<String>,
42 pub retryable: bool,
43}
44
45impl IntegrationError {
46 pub fn new(
47 source: impl Into<String>,
48 kind: IntegrationErrorKind,
49 message: impl Into<String>,
50 ) -> Self {
51 let kind_value = kind;
52 Self {
53 source: source.into(),
54 kind: kind_value,
55 message: message.into(),
56 code: None,
57 retryable: matches!(
58 kind_value,
59 IntegrationErrorKind::Transient
60 | IntegrationErrorKind::RateLimited
61 | IntegrationErrorKind::Timeout
62 | IntegrationErrorKind::Unavailable
63 ),
64 }
65 }
66
67 pub fn with_code(mut self, code: impl Into<String>) -> Self {
68 self.code = Some(code.into());
69 self
70 }
71}
72
73impl fmt::Display for IntegrationError {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 if let Some(code) = &self.code {
76 write!(f, "[{}:{}] {}", self.source, code, self.message)
77 } else {
78 write!(f, "[{}] {}", self.source, self.message)
79 }
80 }
81}
82
83impl std::error::Error for IntegrationError {}
84
85pub type IntegrationResult<T> = Result<T, IntegrationError>;
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88pub struct RetryPolicy {
89 pub max_attempts: u32,
90 pub initial_backoff_ms: u64,
91 pub max_backoff_ms: u64,
92}
93
94impl RetryPolicy {
95 pub fn conservative() -> Self {
96 Self {
97 max_attempts: 3,
98 initial_backoff_ms: 50,
99 max_backoff_ms: 500,
100 }
101 }
102
103 pub fn never() -> Self {
104 Self {
105 max_attempts: 1,
106 initial_backoff_ms: 0,
107 max_backoff_ms: 0,
108 }
109 }
110}
111
112impl Default for RetryPolicy {
113 fn default() -> Self {
114 Self::conservative()
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
119pub struct AdapterCallContract {
120 pub retry_policy: RetryPolicy,
121 pub timeout_ms: u64,
122}
123
124impl AdapterCallContract {
125 pub fn low_latency() -> Self {
126 Self {
127 retry_policy: RetryPolicy::never(),
128 timeout_ms: 250,
129 }
130 }
131
132 pub fn default_query() -> Self {
133 Self {
134 retry_policy: RetryPolicy::conservative(),
135 timeout_ms: 1_500,
136 }
137 }
138
139 pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
140 self.retry_policy = retry_policy;
141 self
142 }
143
144 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
145 self.timeout_ms = timeout_ms.max(1);
146 self
147 }
148}
149
150impl Default for AdapterCallContract {
151 fn default() -> Self {
152 Self::default_query()
153 }
154}
155
156pub fn run_with_contract<T, F>(
157 source: &str,
158 operation: &str,
159 contract: AdapterCallContract,
160 context: &QueryContext,
161 mut operation_fn: F,
162) -> IntegrationResult<T>
163where
164 F: FnMut(u32) -> IntegrationResult<T>,
165{
166 let policy = context
167 .retry_policy_override()
168 .unwrap_or(contract.retry_policy);
169 let attempts = policy.max_attempts.max(1);
170 let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
171 let mut backoff_ms = policy.initial_backoff_ms;
172
173 for attempt in 1..=attempts {
174 let started_at = Instant::now();
175 let result = operation_fn(attempt);
176 let elapsed_ms = started_at.elapsed().as_millis() as u64;
177 if elapsed_ms > timeout_ms {
178 return Err(IntegrationError::new(
179 source,
180 IntegrationErrorKind::Timeout,
181 format!(
182 "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
183 elapsed_ms, timeout_ms
184 ),
185 )
186 .with_code("operation_timeout"));
187 }
188
189 match result {
190 Ok(value) => return Ok(value),
191 Err(err) if !err.retryable || attempt == attempts => return Err(err),
192 Err(err) => {
193 warn!(
194 target: "shelly.integration.query",
195 source,
196 operation,
197 attempt,
198 max_attempts = attempts,
199 tenant_id = ?context.tenant_id,
200 trace_id = ?context.trace_id,
201 correlation_id = context.correlation_id().unwrap_or("-"),
202 request_id = context.request_id().unwrap_or("-"),
203 timeout_ms,
204 backoff_ms,
205 error = %err,
206 "Shelly integration retrying transient failure"
207 );
208 if backoff_ms > 0 {
209 thread::sleep(Duration::from_millis(backoff_ms));
210 }
211 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
212 }
213 }
214 }
215
216 Err(IntegrationError::new(
217 source,
218 IntegrationErrorKind::Transient,
219 format!("{operation} retry exhausted"),
220 )
221 .with_code("retry_exhausted"))
222}
223
224pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
225where
226 F: FnMut(u32) -> IntegrationResult<T>,
227{
228 let attempts = policy.max_attempts.max(1);
229 let mut backoff_ms = policy.initial_backoff_ms;
230
231 for attempt in 1..=attempts {
232 match operation(attempt) {
233 Ok(value) => return Ok(value),
234 Err(err) if !err.retryable || attempt == attempts => return Err(err),
235 Err(_) => {
236 if backoff_ms > 0 {
237 thread::sleep(Duration::from_millis(backoff_ms));
238 }
239 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
240 }
241 }
242 }
243
244 Err(IntegrationError::new(
245 "retry",
246 IntegrationErrorKind::Transient,
247 "retry exhausted",
248 ))
249}
250
251pub trait ConnectionLifecycleHook: Send + Sync {
252 fn on_connect(&self) -> IntegrationResult<()> {
253 Ok(())
254 }
255
256 fn on_disconnect(&self) -> IntegrationResult<()> {
257 Ok(())
258 }
259}
260
261pub trait ConnectionLifecycle {
262 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
263 fn connect(&self) -> IntegrationResult<()>;
264 fn disconnect(&self) -> IntegrationResult<()>;
265}
266
267#[derive(Default)]
268pub struct LifecycleHooks {
269 hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
270}
271
272impl LifecycleHooks {
273 pub fn new() -> Self {
274 Self::default()
275 }
276}
277
278impl ConnectionLifecycle for LifecycleHooks {
279 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
280 self.hooks.push(hook);
281 }
282
283 fn connect(&self) -> IntegrationResult<()> {
284 for hook in &self.hooks {
285 hook.on_connect()?;
286 }
287 Ok(())
288 }
289
290 fn disconnect(&self) -> IntegrationResult<()> {
291 for hook in &self.hooks {
292 hook.on_disconnect()?;
293 }
294 Ok(())
295 }
296}
297
298#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
299pub struct QueryContext {
300 pub tenant_id: Option<String>,
301 pub trace_id: Option<String>,
302 pub tags: BTreeMap<String, String>,
303}
304
305impl QueryContext {
306 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
307 self.tags.insert(key.into(), value.into());
308 self
309 }
310
311 pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
312 self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
313 }
314
315 pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
316 self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
317 }
318
319 pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
320 self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
321 }
322
323 pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
324 self.with_tag(
325 CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
326 policy.max_attempts.to_string(),
327 )
328 .with_tag(
329 CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
330 policy.initial_backoff_ms.to_string(),
331 )
332 .with_tag(
333 CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
334 policy.max_backoff_ms.to_string(),
335 )
336 }
337
338 pub fn correlation_id(&self) -> Option<&str> {
339 self.tags
340 .get(CONTEXT_TAG_CORRELATION_ID)
341 .map(String::as_str)
342 .map(str::trim)
343 .filter(|value| !value.is_empty())
344 }
345
346 pub fn request_id(&self) -> Option<&str> {
347 self.tags
348 .get(CONTEXT_TAG_REQUEST_ID)
349 .map(String::as_str)
350 .map(str::trim)
351 .filter(|value| !value.is_empty())
352 }
353
354 pub fn timeout_ms(&self) -> Option<u64> {
355 self.tags
356 .get(CONTEXT_TAG_TIMEOUT_MS)
357 .and_then(|value| value.parse::<u64>().ok())
358 .filter(|value| *value > 0)
359 }
360
361 pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
362 let max_attempts = self
363 .tags
364 .get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
365 .and_then(|value| value.parse::<u32>().ok())?;
366 let initial_backoff_ms = self
367 .tags
368 .get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
369 .and_then(|value| value.parse::<u64>().ok())
370 .unwrap_or(0);
371 let max_backoff_ms = self
372 .tags
373 .get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
374 .and_then(|value| value.parse::<u64>().ok())
375 .unwrap_or(initial_backoff_ms);
376 Some(RetryPolicy {
377 max_attempts: max_attempts.max(1),
378 initial_backoff_ms,
379 max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
380 })
381 }
382}
383
384pub trait TypedQueryBoundary {
385 type Request: Send + Sync;
386 type Response: Send + Sync;
387
388 fn execute(
389 &self,
390 request: &Self::Request,
391 context: &QueryContext,
392 ) -> IntegrationResult<Self::Response>;
393}
394
395#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
396pub struct SqlCommand {
397 pub statement: String,
398 pub params: Vec<Value>,
399}
400
401impl SqlCommand {
402 pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
403 Self {
404 statement: statement.into(),
405 params,
406 }
407 }
408}
409
410pub trait SingleStoreAdapter:
411 TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
412{
413 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
414 let started_at = Instant::now();
415 let statement = query.statement.as_str();
416 let param_count = query.params.len();
417 let result = self.execute(&query, context);
418 match &result {
419 Ok(rows) => info!(
420 target: "shelly.integration.query",
421 source = "singlestore",
422 operation = "run_query",
423 tenant_id = ?context.tenant_id,
424 trace_id = ?context.trace_id,
425 tag_count = context.tags.len(),
426 statement,
427 param_count,
428 row_count = rows.len(),
429 duration_ms = started_at.elapsed().as_millis() as u64,
430 "Shelly integration query executed"
431 ),
432 Err(err) => warn!(
433 target: "shelly.integration.query",
434 source = "singlestore",
435 operation = "run_query",
436 tenant_id = ?context.tenant_id,
437 trace_id = ?context.trace_id,
438 tag_count = context.tags.len(),
439 statement,
440 param_count,
441 duration_ms = started_at.elapsed().as_millis() as u64,
442 error = %err,
443 "Shelly integration query failed"
444 ),
445 }
446 result
447 }
448}
449
450#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
451pub struct SearchRequest {
452 pub index: String,
453 pub text: String,
454 pub filters: Vec<crate::query::Filter>,
455 pub page: usize,
456 pub per_page: usize,
457}
458
459impl SearchRequest {
460 pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
461 Self {
462 index: index.into(),
463 text: text.into(),
464 filters: Vec::new(),
465 page: 1,
466 per_page: 25,
467 }
468 }
469
470 pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
471 self.filters.push(filter);
472 self
473 }
474}
475
476#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
477pub struct SearchResponse {
478 pub total_hits: usize,
479 pub rows: Vec<StoredRow>,
480}
481
482pub trait OpenSearchAdapter:
483 TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
484{
485 fn search(
486 &self,
487 request: SearchRequest,
488 context: &QueryContext,
489 ) -> IntegrationResult<SearchResponse> {
490 let started_at = Instant::now();
491 let index = request.index.as_str();
492 let text = request.text.as_str();
493 let filter_count = request.filters.len();
494 let page = request.page;
495 let per_page = request.per_page;
496 let result = self.execute(&request, context);
497 match &result {
498 Ok(response) => info!(
499 target: "shelly.integration.query",
500 source = "opensearch",
501 operation = "search",
502 tenant_id = ?context.tenant_id,
503 trace_id = ?context.trace_id,
504 tag_count = context.tags.len(),
505 index,
506 text,
507 filter_count,
508 page,
509 per_page,
510 row_count = response.rows.len(),
511 total_hits = response.total_hits,
512 duration_ms = started_at.elapsed().as_millis() as u64,
513 "Shelly integration query executed"
514 ),
515 Err(err) => warn!(
516 target: "shelly.integration.query",
517 source = "opensearch",
518 operation = "search",
519 tenant_id = ?context.tenant_id,
520 trace_id = ?context.trace_id,
521 tag_count = context.tags.len(),
522 index,
523 text,
524 filter_count,
525 page,
526 per_page,
527 duration_ms = started_at.elapsed().as_millis() as u64,
528 error = %err,
529 "Shelly integration query failed"
530 ),
531 }
532 result
533 }
534}
535
536pub trait AnalyticsSink: Send + Sync {
537 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
538}
539
540#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
541pub struct AnalyticsEvent {
542 pub namespace: String,
543 pub name: String,
544 pub payload: Value,
545}
546
547impl AnalyticsEvent {
548 pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
549 Self {
550 namespace: namespace.into(),
551 name: name.into(),
552 payload,
553 }
554 }
555}
556
557#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
558pub struct JobRequest {
559 pub workflow: String,
560 pub payload: Value,
561 pub idempotency_key: String,
562 pub metadata: BTreeMap<String, String>,
563}
564
565impl JobRequest {
566 pub fn new(
567 workflow: impl Into<String>,
568 payload: Value,
569 idempotency_key: impl Into<String>,
570 ) -> Self {
571 Self {
572 workflow: workflow.into(),
573 payload,
574 idempotency_key: idempotency_key.into(),
575 metadata: BTreeMap::new(),
576 }
577 }
578}
579
580#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
581#[serde(rename_all = "snake_case")]
582pub enum JobState {
583 Queued,
584 Running,
585 Succeeded,
586 Failed,
587 Canceled,
588}
589
590#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
591pub struct JobStatus {
592 pub id: String,
593 pub state: JobState,
594 pub attempts: u32,
595 pub result: Option<Value>,
596 pub error: Option<IntegrationError>,
597}
598
599#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
600pub struct JobHandle {
601 pub id: String,
602 pub workflow: String,
603 pub idempotency_key: String,
604}
605
606pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
607
608pub trait JobOrchestrator: Send + Sync {
609 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
610 fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
611 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
612 fn register_completion_callback(&self, callback: JobCompletionCallback);
613}
614
615pub trait TriggerDevAdapter: Send + Sync {
616 fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
617 fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
618 fn poll_workflow(
619 &self,
620 id: &str,
621 attempts: u32,
622 backoff_ms: u64,
623 ) -> IntegrationResult<JobStatus>;
624}
625
626impl<T> TriggerDevAdapter for T
627where
628 T: JobOrchestrator + Send + Sync,
629{
630 fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
631 self.enqueue(request)
632 }
633
634 fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
635 self.status(id)
636 }
637
638 fn poll_workflow(
639 &self,
640 id: &str,
641 attempts: u32,
642 backoff_ms: u64,
643 ) -> IntegrationResult<JobStatus> {
644 self.poll(id, attempts, backoff_ms)
645 }
646}
647
648#[derive(Clone)]
649pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
650 sink: Arc<S>,
651 namespace: String,
652}
653
654impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
655 pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
656 Self {
657 sink,
658 namespace: namespace.into(),
659 }
660 }
661
662 pub fn emit(
663 &self,
664 name: impl Into<String>,
665 payload: Value,
666 context: &QueryContext,
667 ) -> IntegrationResult<()> {
668 let mut envelope = serde_json::Map::new();
669 envelope.insert("payload".to_string(), payload);
670 envelope.insert(
671 "tenant_id".to_string(),
672 context
673 .tenant_id
674 .as_ref()
675 .map(|value| Value::String(value.clone()))
676 .unwrap_or(Value::Null),
677 );
678 envelope.insert(
679 "trace_id".to_string(),
680 context
681 .trace_id
682 .as_ref()
683 .map(|value| Value::String(value.clone()))
684 .unwrap_or(Value::Null),
685 );
686 envelope.insert(
687 "correlation_id".to_string(),
688 context
689 .correlation_id()
690 .map(|value| Value::String(value.to_string()))
691 .unwrap_or(Value::Null),
692 );
693 envelope.insert(
694 "request_id".to_string(),
695 context
696 .request_id()
697 .map(|value| Value::String(value.to_string()))
698 .unwrap_or(Value::Null),
699 );
700 envelope.insert("tags".to_string(), serde_json::json!(context.tags));
701
702 self.sink.send_event(AnalyticsEvent::new(
703 self.namespace.clone(),
704 name.into(),
705 Value::Object(envelope),
706 ))
707 }
708}
709
710#[derive(Debug, Clone, Default)]
711pub struct InMemorySingleStoreAdapter {
712 rows: Vec<Row>,
713}
714
715impl InMemorySingleStoreAdapter {
716 pub fn new(rows: Vec<Row>) -> Self {
717 Self { rows }
718 }
719}
720
721impl TypedQueryBoundary for InMemorySingleStoreAdapter {
722 type Request = SqlCommand;
723 type Response = Vec<Row>;
724
725 fn execute(
726 &self,
727 request: &Self::Request,
728 _context: &QueryContext,
729 ) -> IntegrationResult<Self::Response> {
730 if request.statement.trim().is_empty() {
731 return Err(IntegrationError::new(
732 "singlestore",
733 IntegrationErrorKind::InvalidInput,
734 "empty SQL statement",
735 )
736 .with_code("empty_statement"));
737 }
738 Ok(self.rows.clone())
739 }
740}
741
742impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
743
744#[derive(Debug, Clone, Default)]
745pub struct InMemoryOpenSearchAdapter {
746 rows: Vec<StoredRow>,
747}
748
749impl InMemoryOpenSearchAdapter {
750 pub fn new(rows: Vec<StoredRow>) -> Self {
751 Self { rows }
752 }
753}
754
755impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
756 type Request = SearchRequest;
757 type Response = SearchResponse;
758
759 fn execute(
760 &self,
761 request: &Self::Request,
762 _context: &QueryContext,
763 ) -> IntegrationResult<Self::Response> {
764 if request.index.trim().is_empty() {
765 return Err(IntegrationError::new(
766 "opensearch",
767 IntegrationErrorKind::InvalidInput,
768 "search index must not be empty",
769 )
770 .with_code("empty_index"));
771 }
772 let needle = request.text.trim().to_lowercase();
773 let mut filtered = self
774 .rows
775 .iter()
776 .filter(|row| {
777 if needle.is_empty() {
778 return true;
779 }
780 row.data.values().any(|value| {
781 value
782 .as_str()
783 .map(|text| text.to_lowercase().contains(&needle))
784 .unwrap_or(false)
785 })
786 })
787 .cloned()
788 .collect::<Vec<_>>();
789 let total_hits = filtered.len();
790 let page = request.page.max(1);
791 let per_page = request.per_page.max(1);
792 let offset = (page - 1) * per_page;
793 filtered = filtered.into_iter().skip(offset).take(per_page).collect();
794 Ok(SearchResponse {
795 total_hits,
796 rows: filtered,
797 })
798 }
799}
800
801impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
802
803#[derive(Debug, Default, Clone)]
804pub struct InMemoryAxiomSink {
805 events: Arc<Mutex<Vec<AnalyticsEvent>>>,
806}
807
808impl InMemoryAxiomSink {
809 pub fn events(&self) -> Vec<AnalyticsEvent> {
810 self.events
811 .lock()
812 .map(|events| events.clone())
813 .unwrap_or_default()
814 }
815}
816
817impl AnalyticsSink for InMemoryAxiomSink {
818 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
819 let started_at = Instant::now();
820 let namespace = event.namespace.clone();
821 let name = event.name.clone();
822 self.events
823 .lock()
824 .map_err(|_| {
825 IntegrationError::new(
826 "axiom",
827 IntegrationErrorKind::Unavailable,
828 "analytics sink lock poisoned",
829 )
830 })?
831 .push(event);
832 info!(
833 target: "shelly.integration.query",
834 source = "axiom",
835 operation = "send_event",
836 namespace,
837 event_name = name,
838 duration_ms = started_at.elapsed().as_millis() as u64,
839 "Shelly integration call executed"
840 );
841 Ok(())
842 }
843}
844
845#[derive(Default)]
846pub struct InMemoryJobOrchestrator {
847 statuses: Mutex<HashMap<String, JobStatus>>,
848 callbacks: Mutex<Vec<JobCompletionCallback>>,
849 next_id: Mutex<u64>,
850}
851
852impl InMemoryJobOrchestrator {
853 pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
854 let status = self.with_status_mut(id, |status| {
855 status.state = JobState::Succeeded;
856 status.result = Some(result);
857 status.error = None;
858 })?;
859 self.notify(&status);
860 Ok(())
861 }
862
863 pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
864 let status = self.with_status_mut(id, |status| {
865 status.state = JobState::Failed;
866 status.result = None;
867 status.error = Some(error);
868 })?;
869 self.notify(&status);
870 Ok(())
871 }
872
873 fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
874 where
875 F: FnOnce(&mut JobStatus),
876 {
877 let mut statuses = self.statuses.lock().map_err(|_| {
878 IntegrationError::new(
879 "trigger",
880 IntegrationErrorKind::Unavailable,
881 "job status lock poisoned",
882 )
883 })?;
884 let Some(status) = statuses.get_mut(id) else {
885 return Err(IntegrationError::new(
886 "trigger",
887 IntegrationErrorKind::InvalidInput,
888 "job not found",
889 )
890 .with_code("job_not_found"));
891 };
892 status.attempts = status.attempts.saturating_add(1);
893 update(status);
894 Ok(status.clone())
895 }
896
897 fn notify(&self, status: &JobStatus) {
898 if let Ok(callbacks) = self.callbacks.lock() {
899 for callback in callbacks.iter() {
900 callback(status);
901 }
902 }
903 }
904
905 fn is_terminal(state: JobState) -> bool {
906 matches!(
907 state,
908 JobState::Succeeded | JobState::Failed | JobState::Canceled
909 )
910 }
911}
912
913impl JobOrchestrator for InMemoryJobOrchestrator {
914 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
915 let started_at = Instant::now();
916 let workflow = request.workflow.clone();
917 let idempotency_key = request.idempotency_key.clone();
918 let mut next_id = self.next_id.lock().map_err(|_| {
919 IntegrationError::new(
920 "trigger",
921 IntegrationErrorKind::Unavailable,
922 "job id lock poisoned",
923 )
924 })?;
925 *next_id = next_id.saturating_add(1);
926 let id = format!("job-{next_id}");
927 let status = JobStatus {
928 id: id.clone(),
929 state: JobState::Queued,
930 attempts: 0,
931 result: None,
932 error: None,
933 };
934 self.statuses
935 .lock()
936 .map_err(|_| {
937 IntegrationError::new(
938 "trigger",
939 IntegrationErrorKind::Unavailable,
940 "job status lock poisoned",
941 )
942 })?
943 .insert(id.clone(), status);
944 let handle = JobHandle {
945 id,
946 workflow: request.workflow,
947 idempotency_key: request.idempotency_key,
948 };
949 info!(
950 target: "shelly.integration.query",
951 source = "trigger",
952 operation = "enqueue",
953 workflow,
954 idempotency_key,
955 job_id = handle.id.as_str(),
956 duration_ms = started_at.elapsed().as_millis() as u64,
957 "Shelly integration call executed"
958 );
959 Ok(handle)
960 }
961
962 fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
963 let started_at = Instant::now();
964 let result = self
965 .statuses
966 .lock()
967 .map_err(|_| {
968 IntegrationError::new(
969 "trigger",
970 IntegrationErrorKind::Unavailable,
971 "job status lock poisoned",
972 )
973 })?
974 .get(id)
975 .cloned()
976 .ok_or_else(|| {
977 IntegrationError::new(
978 "trigger",
979 IntegrationErrorKind::InvalidInput,
980 "job not found",
981 )
982 .with_code("job_not_found")
983 });
984 match &result {
985 Ok(status) => info!(
986 target: "shelly.integration.query",
987 source = "trigger",
988 operation = "status",
989 job_id = id,
990 job_state = ?status.state,
991 attempts = status.attempts,
992 duration_ms = started_at.elapsed().as_millis() as u64,
993 "Shelly integration call executed"
994 ),
995 Err(err) => warn!(
996 target: "shelly.integration.query",
997 source = "trigger",
998 operation = "status",
999 job_id = id,
1000 duration_ms = started_at.elapsed().as_millis() as u64,
1001 error = %err,
1002 "Shelly integration call failed"
1003 ),
1004 }
1005 result
1006 }
1007
1008 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
1009 let started_at = Instant::now();
1010 let attempts = attempts.max(1);
1011 let mut polled = 0u32;
1012 for current in 1..=attempts {
1013 let status = self.status(id)?;
1014 polled = current;
1015 if Self::is_terminal(status.state) || current == attempts {
1016 info!(
1017 target: "shelly.integration.query",
1018 source = "trigger",
1019 operation = "poll",
1020 job_id = id,
1021 attempts = polled,
1022 backoff_ms,
1023 terminal = Self::is_terminal(status.state),
1024 job_state = ?status.state,
1025 duration_ms = started_at.elapsed().as_millis() as u64,
1026 "Shelly integration call executed"
1027 );
1028 return Ok(status);
1029 }
1030 if backoff_ms > 0 {
1031 thread::sleep(Duration::from_millis(backoff_ms));
1032 }
1033 }
1034
1035 let result = self.status(id);
1036 match &result {
1037 Ok(status) => info!(
1038 target: "shelly.integration.query",
1039 source = "trigger",
1040 operation = "poll",
1041 job_id = id,
1042 attempts = polled,
1043 backoff_ms,
1044 terminal = Self::is_terminal(status.state),
1045 job_state = ?status.state,
1046 duration_ms = started_at.elapsed().as_millis() as u64,
1047 "Shelly integration call executed"
1048 ),
1049 Err(err) => warn!(
1050 target: "shelly.integration.query",
1051 source = "trigger",
1052 operation = "poll",
1053 job_id = id,
1054 attempts = polled,
1055 backoff_ms,
1056 duration_ms = started_at.elapsed().as_millis() as u64,
1057 error = %err,
1058 "Shelly integration call failed"
1059 ),
1060 }
1061 result
1062 }
1063
1064 fn register_completion_callback(&self, callback: JobCompletionCallback) {
1065 if let Ok(mut callbacks) = self.callbacks.lock() {
1066 callbacks.push(callback);
1067 info!(
1068 target: "shelly.integration.query",
1069 source = "trigger",
1070 operation = "register_completion_callback",
1071 callback_count = callbacks.len(),
1072 "Shelly integration callback registered"
1073 );
1074 } else {
1075 warn!(
1076 target: "shelly.integration.query",
1077 source = "trigger",
1078 operation = "register_completion_callback",
1079 "Shelly integration callback registration failed"
1080 );
1081 }
1082 }
1083}
1084
1085#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1086pub struct AdapterConformanceCheck {
1087 pub name: String,
1088 pub passed: bool,
1089 pub detail: String,
1090}
1091
1092#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
1093pub struct AdapterConformanceReport {
1094 pub checks: Vec<AdapterConformanceCheck>,
1095}
1096
1097impl AdapterConformanceReport {
1098 pub fn passed(&self) -> bool {
1099 self.checks.iter().all(|check| check.passed)
1100 }
1101
1102 pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1103 self.checks.push(AdapterConformanceCheck {
1104 name: name.into(),
1105 passed: true,
1106 detail: detail.into(),
1107 });
1108 }
1109
1110 pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1111 self.checks.push(AdapterConformanceCheck {
1112 name: name.into(),
1113 passed: false,
1114 detail: detail.into(),
1115 });
1116 }
1117}
1118
1119pub fn run_adapter_conformance_suite(
1120 singlestore: &dyn SingleStoreAdapter,
1121 opensearch: &dyn OpenSearchAdapter,
1122 trigger: &dyn TriggerDevAdapter,
1123 analytics: &dyn AnalyticsSink,
1124 context: &QueryContext,
1125) -> AdapterConformanceReport {
1126 let mut report = AdapterConformanceReport::default();
1127
1128 match singlestore.run_query(
1129 SqlCommand::new(
1130 "SELECT tenant, active_accounts FROM tenant_rollup",
1131 Vec::new(),
1132 ),
1133 context,
1134 ) {
1135 Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
1136 Err(err) => report.add_fail("singlestore.query", err.to_string()),
1137 }
1138
1139 match opensearch.search(SearchRequest::new("accounts", ""), context) {
1140 Ok(response) => report.add_pass(
1141 "opensearch.search",
1142 format!(
1143 "rows={} total_hits={}",
1144 response.rows.len(),
1145 response.total_hits
1146 ),
1147 ),
1148 Err(err) => report.add_fail("opensearch.search", err.to_string()),
1149 }
1150
1151 match trigger.trigger_workflow(JobRequest::new(
1152 "adapter_conformance_probe",
1153 serde_json::json!({"probe": true}),
1154 "adapter-conformance-probe",
1155 )) {
1156 Ok(handle) => {
1157 report.add_pass("trigger.enqueue", format!("id={}", handle.id));
1158 match trigger.workflow_status(&handle.id) {
1159 Ok(status) => report.add_pass(
1160 "trigger.status",
1161 format!("state={:?} attempts={}", status.state, status.attempts),
1162 ),
1163 Err(err) => report.add_fail("trigger.status", err.to_string()),
1164 }
1165 }
1166 Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
1167 }
1168
1169 let analytics_event = AnalyticsEvent::new(
1170 "adapter_conformance",
1171 "probe",
1172 serde_json::json!({
1173 "trace_id": context.trace_id,
1174 "correlation_id": context.correlation_id(),
1175 "request_id": context.request_id(),
1176 }),
1177 );
1178 match analytics.send_event(analytics_event) {
1179 Ok(()) => report.add_pass("analytics.emit", "event accepted"),
1180 Err(err) => report.add_fail("analytics.emit", err.to_string()),
1181 }
1182
1183 report
1184}
1185
1186pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
1187 DataError::Integration(format!("[{}] {}", source.into(), err))
1188}
1189
1190pub fn map_integration_result<T>(
1191 source: impl Into<String>,
1192 result: IntegrationResult<T>,
1193) -> DataResult<T> {
1194 result.map_err(|err| map_integration_error(source, err))
1195}
1196
1197pub fn query_from_search(request: &SearchRequest) -> Query {
1198 let mut query = Query::new().paginate(request.page, request.per_page);
1199 for filter in &request.filters {
1200 query = query.where_filter(filter.clone());
1201 }
1202 query
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207 use super::{
1208 map_integration_result, run_adapter_conformance_suite, run_with_contract, run_with_retry,
1209 AdapterCallContract, AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge,
1210 ConnectionLifecycle, ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
1211 InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
1212 IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
1213 LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
1214 SingleStoreAdapter, SqlCommand, CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID,
1215 CONTEXT_TAG_TIMEOUT_MS,
1216 };
1217 use serde_json::{json, Value};
1218 use std::sync::{Arc, Mutex};
1219
1220 struct CountingHook {
1221 connects: Arc<Mutex<u32>>,
1222 disconnects: Arc<Mutex<u32>>,
1223 }
1224
1225 impl ConnectionLifecycleHook for CountingHook {
1226 fn on_connect(&self) -> super::IntegrationResult<()> {
1227 let mut guard = self.connects.lock().unwrap();
1228 *guard += 1;
1229 Ok(())
1230 }
1231
1232 fn on_disconnect(&self) -> super::IntegrationResult<()> {
1233 let mut guard = self.disconnects.lock().unwrap();
1234 *guard += 1;
1235 Ok(())
1236 }
1237 }
1238
1239 #[test]
1240 fn lifecycle_hooks_are_called() {
1241 let connects = Arc::new(Mutex::new(0));
1242 let disconnects = Arc::new(Mutex::new(0));
1243 let mut lifecycle = LifecycleHooks::new();
1244 lifecycle.register_hook(Arc::new(CountingHook {
1245 connects: connects.clone(),
1246 disconnects: disconnects.clone(),
1247 }));
1248
1249 lifecycle.connect().unwrap();
1250 lifecycle.disconnect().unwrap();
1251 assert_eq!(*connects.lock().unwrap(), 1);
1252 assert_eq!(*disconnects.lock().unwrap(), 1);
1253 }
1254
1255 #[test]
1256 fn retry_policy_retries_transient_errors() {
1257 let mut calls = 0u32;
1258 let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
1259 calls = attempt;
1260 if attempt < 3 {
1261 Err(IntegrationError::new(
1262 "opensearch",
1263 IntegrationErrorKind::Transient,
1264 "temporary failure",
1265 ))
1266 } else {
1267 Ok("ok")
1268 }
1269 })
1270 .unwrap();
1271
1272 assert_eq!(result, "ok");
1273 assert_eq!(calls, 3);
1274 }
1275
1276 #[test]
1277 fn retry_policy_stops_on_permanent_errors() {
1278 let mut calls = 0u32;
1279 let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
1280 calls = attempt;
1281 Err::<(), IntegrationError>(IntegrationError::new(
1282 "singlestore",
1283 IntegrationErrorKind::Permanent,
1284 "invalid sql",
1285 ))
1286 })
1287 .unwrap_err();
1288
1289 assert_eq!(calls, 1);
1290 assert_eq!(err.kind, IntegrationErrorKind::Permanent);
1291 }
1292
1293 #[test]
1294 fn integration_result_maps_into_data_error() {
1295 let mapped = map_integration_result::<()>(
1296 "trigger",
1297 Err(IntegrationError::new(
1298 "trigger",
1299 IntegrationErrorKind::Unavailable,
1300 "service unavailable",
1301 )),
1302 )
1303 .unwrap_err();
1304
1305 assert!(mapped.to_string().contains("service unavailable"));
1306 }
1307
1308 #[derive(Default)]
1309 struct InMemoryJobs {
1310 statuses: Arc<Mutex<Vec<JobStatus>>>,
1311 callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
1312 }
1313
1314 impl JobOrchestrator for InMemoryJobs {
1315 fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
1316 let id = format!("job-{}", request.idempotency_key);
1317 self.statuses.lock().unwrap().push(JobStatus {
1318 id: id.clone(),
1319 state: JobState::Queued,
1320 attempts: 0,
1321 result: None,
1322 error: None,
1323 });
1324 Ok(JobHandle {
1325 id,
1326 workflow: request.workflow,
1327 idempotency_key: request.idempotency_key,
1328 })
1329 }
1330
1331 fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
1332 self.statuses
1333 .lock()
1334 .unwrap()
1335 .iter()
1336 .find(|status| status.id == id)
1337 .cloned()
1338 .ok_or_else(|| {
1339 IntegrationError::new(
1340 "trigger",
1341 IntegrationErrorKind::InvalidInput,
1342 "job not found",
1343 )
1344 })
1345 }
1346
1347 fn poll(
1348 &self,
1349 id: &str,
1350 _attempts: u32,
1351 _backoff_ms: u64,
1352 ) -> super::IntegrationResult<JobStatus> {
1353 self.status(id)
1354 }
1355
1356 fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
1357 self.callbacks.lock().unwrap().push(callback);
1358 }
1359 }
1360
1361 #[test]
1362 fn job_orchestration_contract_supports_enqueue_and_status() {
1363 let jobs = InMemoryJobs::default();
1364 let handle = jobs
1365 .enqueue(JobRequest::new(
1366 "sync_customer",
1367 json!({"id": 42}),
1368 "idempotent-42",
1369 ))
1370 .unwrap();
1371 let status = jobs.status(&handle.id).unwrap();
1372 assert_eq!(status.state, JobState::Queued);
1373 assert_eq!(handle.idempotency_key, "idempotent-42");
1374
1375 let ctx = QueryContext {
1376 tenant_id: Some("tenant-a".to_string()),
1377 ..QueryContext::default()
1378 };
1379 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
1380 }
1381
1382 #[test]
1383 fn reference_singlestore_adapter_runs_typed_sql_boundary() {
1384 let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
1385 "region".to_string(),
1386 Value::String("EMEA".to_string()),
1387 )])]);
1388 let rows = adapter
1389 .run_query(
1390 SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1391 &QueryContext::default(),
1392 )
1393 .unwrap();
1394 assert_eq!(rows.len(), 1);
1395 assert_eq!(
1396 rows[0].get("region"),
1397 Some(&Value::String("EMEA".to_string()))
1398 );
1399 }
1400
1401 #[test]
1402 fn reference_opensearch_adapter_filters_rows() {
1403 let rows = vec![
1404 crate::StoredRow {
1405 id: 1,
1406 data: std::collections::BTreeMap::from([(
1407 "title".to_string(),
1408 Value::String("Acme renewal".to_string()),
1409 )]),
1410 },
1411 crate::StoredRow {
1412 id: 2,
1413 data: std::collections::BTreeMap::from([(
1414 "title".to_string(),
1415 Value::String("Globex onboarding".to_string()),
1416 )]),
1417 },
1418 ];
1419 let adapter = InMemoryOpenSearchAdapter::new(rows);
1420 let response = adapter
1421 .search(
1422 SearchRequest::new("accounts", "renewal"),
1423 &QueryContext::default(),
1424 )
1425 .unwrap();
1426 assert_eq!(response.total_hits, 1);
1427 assert_eq!(response.rows[0].id, 1);
1428 }
1429
1430 #[test]
1431 fn reference_axiom_sink_records_events() {
1432 let sink = InMemoryAxiomSink::default();
1433 sink.send_event(AnalyticsEvent::new(
1434 "sales",
1435 "query_executed",
1436 json!({"latency_ms": 12}),
1437 ))
1438 .unwrap();
1439 let events = sink.events();
1440 assert_eq!(events.len(), 1);
1441 assert_eq!(events[0].name, "query_executed");
1442 }
1443
1444 #[test]
1445 fn reference_trigger_orchestrator_supports_completion_and_polling() {
1446 let orchestrator = InMemoryJobOrchestrator::default();
1447 let completed = Arc::new(Mutex::new(false));
1448 let completed_flag = completed.clone();
1449 orchestrator.register_completion_callback(Arc::new(move |status| {
1450 if status.state == JobState::Succeeded {
1451 if let Ok(mut guard) = completed_flag.lock() {
1452 *guard = true;
1453 }
1454 }
1455 }));
1456
1457 let handle = orchestrator
1458 .enqueue(JobRequest::new(
1459 "refresh_dashboard",
1460 json!({"account_id": 7}),
1461 "refresh-7",
1462 ))
1463 .unwrap();
1464 orchestrator
1465 .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
1466 .unwrap();
1467 let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
1468 assert_eq!(status.state, JobState::Succeeded);
1469 assert_eq!(
1470 status.result,
1471 Some(json!({
1472 "rows_synced": 18
1473 }))
1474 );
1475 assert!(*completed.lock().unwrap());
1476 }
1477
1478 #[test]
1479 fn query_context_helpers_encode_trace_correlation_retry_timeout() {
1480 let context = QueryContext::default()
1481 .with_correlation_id("corr-42")
1482 .with_request_id("req-42")
1483 .with_timeout_ms(900)
1484 .with_retry_policy(RetryPolicy {
1485 max_attempts: 4,
1486 initial_backoff_ms: 25,
1487 max_backoff_ms: 100,
1488 });
1489
1490 assert_eq!(
1491 context
1492 .tags
1493 .get(CONTEXT_TAG_CORRELATION_ID)
1494 .map(String::as_str),
1495 Some("corr-42")
1496 );
1497 assert_eq!(
1498 context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
1499 Some("req-42")
1500 );
1501 assert_eq!(
1502 context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
1503 Some("900")
1504 );
1505 assert_eq!(context.correlation_id(), Some("corr-42"));
1506 assert_eq!(context.request_id(), Some("req-42"));
1507 assert_eq!(context.timeout_ms(), Some(900));
1508 assert_eq!(
1509 context.retry_policy_override(),
1510 Some(RetryPolicy {
1511 max_attempts: 4,
1512 initial_backoff_ms: 25,
1513 max_backoff_ms: 100
1514 })
1515 );
1516 }
1517
1518 #[test]
1519 fn run_with_contract_respects_context_retry_override() {
1520 let context = QueryContext::default().with_retry_policy(RetryPolicy {
1521 max_attempts: 2,
1522 initial_backoff_ms: 0,
1523 max_backoff_ms: 0,
1524 });
1525 let mut attempts = 0u32;
1526 let result = run_with_contract(
1527 "opensearch",
1528 "search",
1529 AdapterCallContract::default()
1530 .with_retry_policy(RetryPolicy::never())
1531 .with_timeout_ms(1_000),
1532 &context,
1533 |attempt| {
1534 attempts = attempt;
1535 if attempt == 1 {
1536 Err(IntegrationError::new(
1537 "opensearch",
1538 IntegrationErrorKind::Transient,
1539 "temporary failure",
1540 ))
1541 } else {
1542 Ok("ok")
1543 }
1544 },
1545 )
1546 .unwrap();
1547 assert_eq!(result, "ok");
1548 assert_eq!(attempts, 2);
1549 }
1550
1551 #[test]
1552 fn run_with_contract_returns_timeout_error() {
1553 let context = QueryContext::default().with_timeout_ms(5);
1554 let err = run_with_contract(
1555 "singlestore",
1556 "run_query",
1557 AdapterCallContract::default(),
1558 &context,
1559 |_| {
1560 std::thread::sleep(std::time::Duration::from_millis(15));
1561 Ok::<_, IntegrationError>("done")
1562 },
1563 )
1564 .unwrap_err();
1565 assert_eq!(err.kind, IntegrationErrorKind::Timeout);
1566 assert_eq!(err.code.as_deref(), Some("operation_timeout"));
1567 }
1568
1569 #[test]
1570 fn axiom_bridge_enriches_events_with_trace_and_correlation() {
1571 let sink = Arc::new(InMemoryAxiomSink::default());
1572 let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
1573 let context = QueryContext {
1574 tenant_id: Some("tenant-a".to_string()),
1575 trace_id: Some("trace-123".to_string()),
1576 tags: std::collections::BTreeMap::new(),
1577 }
1578 .with_correlation_id("corr-9")
1579 .with_request_id("req-9");
1580 bridge
1581 .emit("session_event", json!({"event":"patch"}), &context)
1582 .unwrap();
1583 let events = sink.events();
1584 assert_eq!(events.len(), 1);
1585 let payload = events[0].payload.as_object().unwrap();
1586 assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
1587 assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
1588 assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
1589 }
1590
1591 #[test]
1592 fn conformance_suite_passes_with_reference_adapters() {
1593 let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
1594 [("tenant".to_string(), Value::String("north".to_string()))],
1595 )]);
1596 let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
1597 id: 1,
1598 data: std::collections::BTreeMap::from([(
1599 "title".to_string(),
1600 Value::String("Acme renewal".to_string()),
1601 )]),
1602 }]);
1603 let trigger = InMemoryJobOrchestrator::default();
1604 let analytics = InMemoryAxiomSink::default();
1605 let report = run_adapter_conformance_suite(
1606 &singlestore,
1607 &opensearch,
1608 &trigger,
1609 &analytics,
1610 &QueryContext::default(),
1611 );
1612 assert!(report.passed(), "report={report:?}");
1613 assert!(report.checks.len() >= 4);
1614 }
1615}