1use crate::{
2 error::{DataError, DataResult},
3 query::{Query, WireFormatProfile},
4 repo::{CompactRowsPayload, Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt,
11 future::Future,
12 sync::{Arc, Mutex},
13 thread,
14 time::{Duration, Instant},
15};
16use tokio::time::sleep;
17use tracing::{info, warn};
18
19pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
20pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
21pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
22pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
23pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
24pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum IntegrationErrorKind {
29 Transient,
30 Permanent,
31 Auth,
32 RateLimited,
33 Timeout,
34 Unavailable,
35 InvalidInput,
36}
37
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39pub struct IntegrationError {
40 pub source: String,
41 pub kind: IntegrationErrorKind,
42 pub message: String,
43 pub code: Option<String>,
44 pub retryable: bool,
45}
46
47impl IntegrationError {
48 pub fn new(
49 source: impl Into<String>,
50 kind: IntegrationErrorKind,
51 message: impl Into<String>,
52 ) -> Self {
53 let kind_value = kind;
54 Self {
55 source: source.into(),
56 kind: kind_value,
57 message: message.into(),
58 code: None,
59 retryable: matches!(
60 kind_value,
61 IntegrationErrorKind::Transient
62 | IntegrationErrorKind::RateLimited
63 | IntegrationErrorKind::Timeout
64 | IntegrationErrorKind::Unavailable
65 ),
66 }
67 }
68
69 pub fn with_code(mut self, code: impl Into<String>) -> Self {
70 self.code = Some(code.into());
71 self
72 }
73}
74
75impl fmt::Display for IntegrationError {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 if let Some(code) = &self.code {
78 write!(f, "[{}:{}] {}", self.source, code, self.message)
79 } else {
80 write!(f, "[{}] {}", self.source, self.message)
81 }
82 }
83}
84
85impl std::error::Error for IntegrationError {}
86
87pub type IntegrationResult<T> = Result<T, IntegrationError>;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90pub struct RetryPolicy {
91 pub max_attempts: u32,
92 pub initial_backoff_ms: u64,
93 pub max_backoff_ms: u64,
94}
95
96impl RetryPolicy {
97 pub fn conservative() -> Self {
98 Self {
99 max_attempts: 3,
100 initial_backoff_ms: 50,
101 max_backoff_ms: 500,
102 }
103 }
104
105 pub fn never() -> Self {
106 Self {
107 max_attempts: 1,
108 initial_backoff_ms: 0,
109 max_backoff_ms: 0,
110 }
111 }
112}
113
114impl Default for RetryPolicy {
115 fn default() -> Self {
116 Self::conservative()
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121pub struct AdapterCallContract {
122 pub retry_policy: RetryPolicy,
123 pub timeout_ms: u64,
124}
125
126impl AdapterCallContract {
127 pub fn low_latency() -> Self {
128 Self {
129 retry_policy: RetryPolicy::never(),
130 timeout_ms: 250,
131 }
132 }
133
134 pub fn default_query() -> Self {
135 Self {
136 retry_policy: RetryPolicy::conservative(),
137 timeout_ms: 1_500,
138 }
139 }
140
141 pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
142 self.retry_policy = retry_policy;
143 self
144 }
145
146 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
147 self.timeout_ms = timeout_ms.max(1);
148 self
149 }
150}
151
152impl Default for AdapterCallContract {
153 fn default() -> Self {
154 Self::default_query()
155 }
156}
157
158pub fn run_with_contract<T, F>(
159 source: &str,
160 operation: &str,
161 contract: AdapterCallContract,
162 context: &QueryContext,
163 mut operation_fn: F,
164) -> IntegrationResult<T>
165where
166 F: FnMut(u32) -> IntegrationResult<T>,
167{
168 let policy = context
169 .retry_policy_override()
170 .unwrap_or(contract.retry_policy);
171 let attempts = policy.max_attempts.max(1);
172 let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
173 let mut backoff_ms = policy.initial_backoff_ms;
174
175 for attempt in 1..=attempts {
176 let started_at = Instant::now();
177 let result = operation_fn(attempt);
178 let elapsed_ms = started_at.elapsed().as_millis() as u64;
179 if elapsed_ms > timeout_ms {
180 return Err(IntegrationError::new(
181 source,
182 IntegrationErrorKind::Timeout,
183 format!(
184 "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
185 elapsed_ms, timeout_ms
186 ),
187 )
188 .with_code("operation_timeout"));
189 }
190
191 match result {
192 Ok(value) => return Ok(value),
193 Err(err) if !err.retryable || attempt == attempts => return Err(err),
194 Err(err) => {
195 warn!(
196 target: "shelly.integration.query",
197 source,
198 operation,
199 attempt,
200 max_attempts = attempts,
201 tenant_id = ?context.tenant_id,
202 trace_id = ?context.trace_id,
203 correlation_id = context.correlation_id().unwrap_or("-"),
204 request_id = context.request_id().unwrap_or("-"),
205 timeout_ms,
206 backoff_ms,
207 error = %err,
208 "Shelly integration retrying transient failure without blocking sleep; use run_with_contract_async for non-blocking backoff"
209 );
210 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
211 }
212 }
213 }
214
215 Err(IntegrationError::new(
216 source,
217 IntegrationErrorKind::Transient,
218 format!("{operation} retry exhausted"),
219 )
220 .with_code("retry_exhausted"))
221}
222
223pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
224where
225 F: FnMut(u32) -> IntegrationResult<T>,
226{
227 let attempts = policy.max_attempts.max(1);
228 let mut backoff_ms = policy.initial_backoff_ms;
229
230 for attempt in 1..=attempts {
231 match operation(attempt) {
232 Ok(value) => return Ok(value),
233 Err(err) if !err.retryable || attempt == attempts => return Err(err),
234 Err(_) => {
235 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
236 }
237 }
238 }
239
240 Err(IntegrationError::new(
241 "retry",
242 IntegrationErrorKind::Transient,
243 "retry exhausted",
244 ))
245}
246
247pub async fn run_with_contract_async<T, F, Fut>(
248 source: &str,
249 operation: &str,
250 contract: AdapterCallContract,
251 context: &QueryContext,
252 mut operation_fn: F,
253) -> IntegrationResult<T>
254where
255 F: FnMut(u32) -> Fut,
256 Fut: Future<Output = IntegrationResult<T>>,
257{
258 let policy = context
259 .retry_policy_override()
260 .unwrap_or(contract.retry_policy);
261 let attempts = policy.max_attempts.max(1);
262 let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
263 let mut backoff_ms = policy.initial_backoff_ms;
264
265 for attempt in 1..=attempts {
266 let started_at = Instant::now();
267 let result = operation_fn(attempt).await;
268 let elapsed_ms = started_at.elapsed().as_millis() as u64;
269 if elapsed_ms > timeout_ms {
270 return Err(IntegrationError::new(
271 source,
272 IntegrationErrorKind::Timeout,
273 format!(
274 "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
275 elapsed_ms, timeout_ms
276 ),
277 )
278 .with_code("operation_timeout"));
279 }
280
281 match result {
282 Ok(value) => return Ok(value),
283 Err(err) if !err.retryable || attempt == attempts => return Err(err),
284 Err(err) => {
285 warn!(
286 target: "shelly.integration.query",
287 source,
288 operation,
289 attempt,
290 max_attempts = attempts,
291 tenant_id = ?context.tenant_id,
292 trace_id = ?context.trace_id,
293 correlation_id = context.correlation_id().unwrap_or("-"),
294 request_id = context.request_id().unwrap_or("-"),
295 timeout_ms,
296 backoff_ms,
297 error = %err,
298 "Shelly integration retrying transient failure with non-blocking backoff"
299 );
300 if backoff_ms > 0 {
301 sleep(Duration::from_millis(backoff_ms)).await;
302 }
303 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
304 }
305 }
306 }
307
308 Err(IntegrationError::new(
309 source,
310 IntegrationErrorKind::Transient,
311 format!("{operation} retry exhausted"),
312 )
313 .with_code("retry_exhausted"))
314}
315
316pub async fn run_with_retry_async<T, F, Fut>(
317 policy: RetryPolicy,
318 mut operation: F,
319) -> IntegrationResult<T>
320where
321 F: FnMut(u32) -> Fut,
322 Fut: Future<Output = IntegrationResult<T>>,
323{
324 let attempts = policy.max_attempts.max(1);
325 let mut backoff_ms = policy.initial_backoff_ms;
326
327 for attempt in 1..=attempts {
328 match operation(attempt).await {
329 Ok(value) => return Ok(value),
330 Err(err) if !err.retryable || attempt == attempts => return Err(err),
331 Err(_) => {
332 if backoff_ms > 0 {
333 sleep(Duration::from_millis(backoff_ms)).await;
334 }
335 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
336 }
337 }
338 }
339
340 Err(IntegrationError::new(
341 "retry",
342 IntegrationErrorKind::Transient,
343 "retry exhausted",
344 ))
345}
346
347pub trait ConnectionLifecycleHook: Send + Sync {
348 fn on_connect(&self) -> IntegrationResult<()> {
349 Ok(())
350 }
351
352 fn on_disconnect(&self) -> IntegrationResult<()> {
353 Ok(())
354 }
355}
356
357pub trait ConnectionLifecycle {
358 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
359 fn connect(&self) -> IntegrationResult<()>;
360 fn disconnect(&self) -> IntegrationResult<()>;
361}
362
363#[derive(Default)]
364pub struct LifecycleHooks {
365 hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
366}
367
368impl LifecycleHooks {
369 pub fn new() -> Self {
370 Self::default()
371 }
372}
373
374impl ConnectionLifecycle for LifecycleHooks {
375 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
376 self.hooks.push(hook);
377 }
378
379 fn connect(&self) -> IntegrationResult<()> {
380 for hook in &self.hooks {
381 hook.on_connect()?;
382 }
383 Ok(())
384 }
385
386 fn disconnect(&self) -> IntegrationResult<()> {
387 for hook in &self.hooks {
388 hook.on_disconnect()?;
389 }
390 Ok(())
391 }
392}
393
394#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
395pub struct QueryContext {
396 pub tenant_id: Option<String>,
397 pub trace_id: Option<String>,
398 pub tags: BTreeMap<String, String>,
399}
400
401impl QueryContext {
402 pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
403 let tenant_id = tenant_id.into();
404 let tenant_id = tenant_id.trim();
405 if tenant_id.is_empty() {
406 self.tenant_id = None;
407 } else {
408 self.tenant_id = Some(tenant_id.to_string());
409 }
410 self
411 }
412
413 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
414 self.tags.insert(key.into(), value.into());
415 self
416 }
417
418 pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
419 self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
420 }
421
422 pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
423 self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
424 }
425
426 pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
427 self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
428 }
429
430 pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
431 self.with_tag(
432 CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
433 policy.max_attempts.to_string(),
434 )
435 .with_tag(
436 CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
437 policy.initial_backoff_ms.to_string(),
438 )
439 .with_tag(
440 CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
441 policy.max_backoff_ms.to_string(),
442 )
443 }
444
445 pub fn correlation_id(&self) -> Option<&str> {
446 self.tags
447 .get(CONTEXT_TAG_CORRELATION_ID)
448 .map(String::as_str)
449 .map(str::trim)
450 .filter(|value| !value.is_empty())
451 }
452
453 pub fn request_id(&self) -> Option<&str> {
454 self.tags
455 .get(CONTEXT_TAG_REQUEST_ID)
456 .map(String::as_str)
457 .map(str::trim)
458 .filter(|value| !value.is_empty())
459 }
460
461 pub fn timeout_ms(&self) -> Option<u64> {
462 self.tags
463 .get(CONTEXT_TAG_TIMEOUT_MS)
464 .and_then(|value| value.parse::<u64>().ok())
465 .filter(|value| *value > 0)
466 }
467
468 pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
469 let max_attempts = self
470 .tags
471 .get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
472 .and_then(|value| value.parse::<u32>().ok())?;
473 let initial_backoff_ms = self
474 .tags
475 .get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
476 .and_then(|value| value.parse::<u64>().ok())
477 .unwrap_or(0);
478 let max_backoff_ms = self
479 .tags
480 .get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
481 .and_then(|value| value.parse::<u64>().ok())
482 .unwrap_or(initial_backoff_ms);
483 Some(RetryPolicy {
484 max_attempts: max_attempts.max(1),
485 initial_backoff_ms,
486 max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
487 })
488 }
489}
490
491pub trait TypedQueryBoundary {
492 type Request: Send + Sync;
493 type Response: Send + Sync;
494
495 fn execute(
496 &self,
497 request: &Self::Request,
498 context: &QueryContext,
499 ) -> IntegrationResult<Self::Response>;
500}
501
502#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
503pub struct SqlCommand {
504 pub statement: String,
505 pub params: Vec<Value>,
506}
507
508impl SqlCommand {
509 pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
510 Self {
511 statement: statement.into(),
512 params,
513 }
514 }
515}
516
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub struct DataWindowRequest {
519 pub dataset: String,
520 pub offset: usize,
521 pub limit: usize,
522 #[serde(default, skip_serializing_if = "Option::is_none")]
523 pub query_token: Option<String>,
524 #[serde(default, skip_serializing_if = "Option::is_none")]
525 pub window_token: Option<String>,
526 #[serde(default)]
527 pub wire_format: WireFormatProfile,
528}
529
530impl DataWindowRequest {
531 pub fn new(dataset: impl Into<String>, offset: usize, limit: usize) -> Self {
532 Self {
533 dataset: dataset.into(),
534 offset,
535 limit: limit.max(1),
536 query_token: None,
537 window_token: None,
538 wire_format: WireFormatProfile::Json,
539 }
540 }
541
542 pub fn with_query_token(mut self, query_token: impl Into<String>) -> Self {
543 self.query_token = Some(query_token.into());
544 self
545 }
546
547 pub fn with_window_token(mut self, window_token: impl Into<String>) -> Self {
548 self.window_token = Some(window_token.into());
549 self
550 }
551
552 pub fn compact(mut self) -> Self {
553 self.wire_format = WireFormatProfile::Compact;
554 self
555 }
556}
557
558#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
559pub struct DataWindowResponse {
560 pub dataset: String,
561 pub offset: usize,
562 pub limit: usize,
563 pub total_rows: usize,
564 pub query_token: String,
565 pub window_token: String,
566 pub rows: Vec<Row>,
567 #[serde(default, skip_serializing_if = "Option::is_none")]
568 pub compact_rows: Option<CompactRowsPayload>,
569}
570
571pub trait SingleStoreAdapter:
572 TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
573{
574 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
575 let started_at = Instant::now();
576 let statement = query.statement.as_str();
577 let param_count = query.params.len();
578 let result = self.execute(&query, context);
579 match &result {
580 Ok(rows) => info!(
581 target: "shelly.integration.query",
582 source = "singlestore",
583 operation = "run_query",
584 tenant_id = ?context.tenant_id,
585 trace_id = ?context.trace_id,
586 tag_count = context.tags.len(),
587 statement,
588 param_count,
589 row_count = rows.len(),
590 duration_ms = started_at.elapsed().as_millis() as u64,
591 "Shelly integration query executed"
592 ),
593 Err(err) => warn!(
594 target: "shelly.integration.query",
595 source = "singlestore",
596 operation = "run_query",
597 tenant_id = ?context.tenant_id,
598 trace_id = ?context.trace_id,
599 tag_count = context.tags.len(),
600 statement,
601 param_count,
602 duration_ms = started_at.elapsed().as_millis() as u64,
603 error = %err,
604 "Shelly integration query failed"
605 ),
606 }
607 result
608 }
609
610 fn run_high_volume_window_query(
611 &self,
612 query: SqlCommand,
613 request: DataWindowRequest,
614 context: &QueryContext,
615 ) -> IntegrationResult<DataWindowResponse> {
616 let rows = self.run_query(query, context)?;
617 Ok(materialize_window_response(
618 "singlestore",
619 request,
620 rows,
621 None,
622 ))
623 }
624}
625
626pub trait ClickHouseAdapter:
627 TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
628{
629 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
630 let started_at = Instant::now();
631 let statement = query.statement.as_str();
632 let param_count = query.params.len();
633 let result = self.execute(&query, context);
634 match &result {
635 Ok(rows) => info!(
636 target: "shelly.integration.query",
637 source = "clickhouse",
638 operation = "run_query",
639 tenant_id = ?context.tenant_id,
640 trace_id = ?context.trace_id,
641 correlation_id = context.correlation_id().unwrap_or("-"),
642 request_id = context.request_id().unwrap_or("-"),
643 tag_count = context.tags.len(),
644 statement,
645 param_count,
646 row_count = rows.len(),
647 duration_ms = started_at.elapsed().as_millis() as u64,
648 "Shelly integration query executed"
649 ),
650 Err(err) => warn!(
651 target: "shelly.integration.query",
652 source = "clickhouse",
653 operation = "run_query",
654 tenant_id = ?context.tenant_id,
655 trace_id = ?context.trace_id,
656 correlation_id = context.correlation_id().unwrap_or("-"),
657 request_id = context.request_id().unwrap_or("-"),
658 tag_count = context.tags.len(),
659 statement,
660 param_count,
661 duration_ms = started_at.elapsed().as_millis() as u64,
662 error = %err,
663 "Shelly integration query failed"
664 ),
665 }
666 result
667 }
668
669 fn run_high_volume_window_query(
670 &self,
671 query: SqlCommand,
672 request: DataWindowRequest,
673 context: &QueryContext,
674 ) -> IntegrationResult<DataWindowResponse> {
675 let rows = self.run_query(query, context)?;
676 Ok(materialize_window_response(
677 "clickhouse",
678 request,
679 rows,
680 None,
681 ))
682 }
683}
684
685pub trait BigQueryAdapter:
686 TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> + Send + Sync
687{
688 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
689 let started_at = Instant::now();
690 let statement = query.statement.as_str();
691 let param_count = query.params.len();
692 let result = self.execute(&query, context);
693 match &result {
694 Ok(rows) => info!(
695 target: "shelly.integration.query",
696 source = "bigquery",
697 operation = "run_query",
698 tenant_id = ?context.tenant_id,
699 trace_id = ?context.trace_id,
700 correlation_id = context.correlation_id().unwrap_or("-"),
701 request_id = context.request_id().unwrap_or("-"),
702 tag_count = context.tags.len(),
703 statement,
704 param_count,
705 row_count = rows.len(),
706 duration_ms = started_at.elapsed().as_millis() as u64,
707 "Shelly integration query executed"
708 ),
709 Err(err) => warn!(
710 target: "shelly.integration.query",
711 source = "bigquery",
712 operation = "run_query",
713 tenant_id = ?context.tenant_id,
714 trace_id = ?context.trace_id,
715 correlation_id = context.correlation_id().unwrap_or("-"),
716 request_id = context.request_id().unwrap_or("-"),
717 tag_count = context.tags.len(),
718 statement,
719 param_count,
720 duration_ms = started_at.elapsed().as_millis() as u64,
721 error = %err,
722 "Shelly integration query failed"
723 ),
724 }
725 result
726 }
727
728 fn run_high_volume_window_query(
729 &self,
730 query: SqlCommand,
731 request: DataWindowRequest,
732 context: &QueryContext,
733 ) -> IntegrationResult<DataWindowResponse> {
734 let rows = self.run_query(query, context)?;
735 Ok(materialize_window_response("bigquery", request, rows, None))
736 }
737}
738
739#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
740pub struct SearchRequest {
741 pub index: String,
742 pub text: String,
743 pub filters: Vec<crate::query::Filter>,
744 pub page: usize,
745 pub per_page: usize,
746}
747
748impl SearchRequest {
749 pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
750 Self {
751 index: index.into(),
752 text: text.into(),
753 filters: Vec::new(),
754 page: 1,
755 per_page: 25,
756 }
757 }
758
759 pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
760 self.filters.push(filter);
761 self
762 }
763}
764
765#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
766pub struct SearchResponse {
767 pub total_hits: usize,
768 pub rows: Vec<StoredRow>,
769}
770
771pub trait OpenSearchAdapter:
772 TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse> + Send + Sync
773{
774 fn search(
775 &self,
776 request: SearchRequest,
777 context: &QueryContext,
778 ) -> IntegrationResult<SearchResponse> {
779 let started_at = Instant::now();
780 let index = request.index.as_str();
781 let text = request.text.as_str();
782 let filter_count = request.filters.len();
783 let page = request.page;
784 let per_page = request.per_page;
785 let result = self.execute(&request, context);
786 match &result {
787 Ok(response) => info!(
788 target: "shelly.integration.query",
789 source = "opensearch",
790 operation = "search",
791 tenant_id = ?context.tenant_id,
792 trace_id = ?context.trace_id,
793 tag_count = context.tags.len(),
794 index,
795 text,
796 filter_count,
797 page,
798 per_page,
799 row_count = response.rows.len(),
800 total_hits = response.total_hits,
801 duration_ms = started_at.elapsed().as_millis() as u64,
802 "Shelly integration query executed"
803 ),
804 Err(err) => warn!(
805 target: "shelly.integration.query",
806 source = "opensearch",
807 operation = "search",
808 tenant_id = ?context.tenant_id,
809 trace_id = ?context.trace_id,
810 tag_count = context.tags.len(),
811 index,
812 text,
813 filter_count,
814 page,
815 per_page,
816 duration_ms = started_at.elapsed().as_millis() as u64,
817 error = %err,
818 "Shelly integration query failed"
819 ),
820 }
821 result
822 }
823
824 fn search_window(
825 &self,
826 mut request: SearchRequest,
827 window: DataWindowRequest,
828 context: &QueryContext,
829 ) -> IntegrationResult<DataWindowResponse> {
830 let limit = window.limit.max(1);
831 request.page = (window.offset / limit).saturating_add(1);
832 request.per_page = limit;
833 let response = self.search(request, context)?;
834 let rows = response
835 .rows
836 .into_iter()
837 .map(|row| row.data)
838 .collect::<Vec<_>>();
839 let query_token = window
840 .query_token
841 .unwrap_or_else(|| format!("opensearch:{}:{}", window.dataset, response.total_hits));
842 let window_token = window.window_token.unwrap_or_else(|| {
843 format!(
844 "{query_token}:offset={}:limit={limit}:rows={}",
845 window.offset,
846 rows.len()
847 )
848 });
849 let compact_rows =
850 (window.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
851 Ok(DataWindowResponse {
852 dataset: window.dataset,
853 offset: window.offset,
854 limit,
855 total_rows: response.total_hits,
856 query_token,
857 window_token,
858 rows,
859 compact_rows,
860 })
861 }
862}
863
864pub trait AnalyticsSink: Send + Sync {
865 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
866}
867
868#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
869pub struct AnalyticsEvent {
870 pub namespace: String,
871 pub name: String,
872 pub payload: Value,
873}
874
875impl AnalyticsEvent {
876 pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
877 Self {
878 namespace: namespace.into(),
879 name: name.into(),
880 payload,
881 }
882 }
883}
884
885#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
886pub struct JobRequest {
887 pub workflow: String,
888 pub payload: Value,
889 pub idempotency_key: String,
890 pub metadata: BTreeMap<String, String>,
891}
892
893impl JobRequest {
894 pub fn new(
895 workflow: impl Into<String>,
896 payload: Value,
897 idempotency_key: impl Into<String>,
898 ) -> Self {
899 Self {
900 workflow: workflow.into(),
901 payload,
902 idempotency_key: idempotency_key.into(),
903 metadata: BTreeMap::new(),
904 }
905 }
906}
907
908#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
909#[serde(rename_all = "snake_case")]
910pub enum JobState {
911 Queued,
912 Running,
913 Succeeded,
914 Failed,
915 Canceled,
916}
917
918#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
919pub struct JobStatus {
920 pub id: String,
921 pub state: JobState,
922 pub attempts: u32,
923 pub result: Option<Value>,
924 pub error: Option<IntegrationError>,
925}
926
927#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
928pub struct JobHandle {
929 pub id: String,
930 pub workflow: String,
931 pub idempotency_key: String,
932}
933
934pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
935
936pub trait JobOrchestrator: Send + Sync {
937 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
938 fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
939 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
940 fn register_completion_callback(&self, callback: JobCompletionCallback);
941}
942
943pub trait TriggerDevAdapter: Send + Sync {
944 fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
945 fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
946 fn poll_workflow(
947 &self,
948 id: &str,
949 attempts: u32,
950 backoff_ms: u64,
951 ) -> IntegrationResult<JobStatus>;
952}
953
954impl<T> TriggerDevAdapter for T
955where
956 T: JobOrchestrator + Send + Sync,
957{
958 fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
959 self.enqueue(request)
960 }
961
962 fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
963 self.status(id)
964 }
965
966 fn poll_workflow(
967 &self,
968 id: &str,
969 attempts: u32,
970 backoff_ms: u64,
971 ) -> IntegrationResult<JobStatus> {
972 self.poll(id, attempts, backoff_ms)
973 }
974}
975
976#[derive(Clone)]
977pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
978 sink: Arc<S>,
979 namespace: String,
980}
981
982impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
983 pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
984 Self {
985 sink,
986 namespace: namespace.into(),
987 }
988 }
989
990 pub fn emit(
991 &self,
992 name: impl Into<String>,
993 payload: Value,
994 context: &QueryContext,
995 ) -> IntegrationResult<()> {
996 let mut envelope = serde_json::Map::new();
997 envelope.insert("payload".to_string(), payload);
998 envelope.insert(
999 "tenant_id".to_string(),
1000 context
1001 .tenant_id
1002 .as_ref()
1003 .map(|value| Value::String(value.clone()))
1004 .unwrap_or(Value::Null),
1005 );
1006 envelope.insert(
1007 "trace_id".to_string(),
1008 context
1009 .trace_id
1010 .as_ref()
1011 .map(|value| Value::String(value.clone()))
1012 .unwrap_or(Value::Null),
1013 );
1014 envelope.insert(
1015 "correlation_id".to_string(),
1016 context
1017 .correlation_id()
1018 .map(|value| Value::String(value.to_string()))
1019 .unwrap_or(Value::Null),
1020 );
1021 envelope.insert(
1022 "request_id".to_string(),
1023 context
1024 .request_id()
1025 .map(|value| Value::String(value.to_string()))
1026 .unwrap_or(Value::Null),
1027 );
1028 envelope.insert("tags".to_string(), serde_json::json!(context.tags));
1029
1030 self.sink.send_event(AnalyticsEvent::new(
1031 self.namespace.clone(),
1032 name.into(),
1033 Value::Object(envelope),
1034 ))
1035 }
1036}
1037
1038#[derive(Debug, Clone, Default)]
1039pub struct InMemorySingleStoreAdapter {
1040 rows: Vec<Row>,
1041}
1042
1043impl InMemorySingleStoreAdapter {
1044 pub fn new(rows: Vec<Row>) -> Self {
1045 Self { rows }
1046 }
1047}
1048
1049impl TypedQueryBoundary for InMemorySingleStoreAdapter {
1050 type Request = SqlCommand;
1051 type Response = Vec<Row>;
1052
1053 fn execute(
1054 &self,
1055 request: &Self::Request,
1056 _context: &QueryContext,
1057 ) -> IntegrationResult<Self::Response> {
1058 if request.statement.trim().is_empty() {
1059 return Err(IntegrationError::new(
1060 "singlestore",
1061 IntegrationErrorKind::InvalidInput,
1062 "empty SQL statement",
1063 )
1064 .with_code("empty_statement"));
1065 }
1066 Ok(self.rows.clone())
1067 }
1068}
1069
1070impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
1071
1072#[derive(Debug, Clone, Default)]
1073pub struct InMemoryClickHouseAdapter {
1074 rows: Vec<Row>,
1075}
1076
1077impl InMemoryClickHouseAdapter {
1078 pub fn new(rows: Vec<Row>) -> Self {
1079 Self { rows }
1080 }
1081}
1082
1083impl TypedQueryBoundary for InMemoryClickHouseAdapter {
1084 type Request = SqlCommand;
1085 type Response = Vec<Row>;
1086
1087 fn execute(
1088 &self,
1089 request: &Self::Request,
1090 _context: &QueryContext,
1091 ) -> IntegrationResult<Self::Response> {
1092 if request.statement.trim().is_empty() {
1093 return Err(IntegrationError::new(
1094 "clickhouse",
1095 IntegrationErrorKind::InvalidInput,
1096 "empty SQL statement",
1097 )
1098 .with_code("empty_statement"));
1099 }
1100 Ok(self.rows.clone())
1101 }
1102}
1103
1104impl ClickHouseAdapter for InMemoryClickHouseAdapter {}
1105
1106#[derive(Debug, Clone, Default)]
1107pub struct InMemoryBigQueryAdapter {
1108 rows: Vec<Row>,
1109}
1110
1111impl InMemoryBigQueryAdapter {
1112 pub fn new(rows: Vec<Row>) -> Self {
1113 Self { rows }
1114 }
1115}
1116
1117impl TypedQueryBoundary for InMemoryBigQueryAdapter {
1118 type Request = SqlCommand;
1119 type Response = Vec<Row>;
1120
1121 fn execute(
1122 &self,
1123 request: &Self::Request,
1124 _context: &QueryContext,
1125 ) -> IntegrationResult<Self::Response> {
1126 if request.statement.trim().is_empty() {
1127 return Err(IntegrationError::new(
1128 "bigquery",
1129 IntegrationErrorKind::InvalidInput,
1130 "empty SQL statement",
1131 )
1132 .with_code("empty_statement"));
1133 }
1134 Ok(self.rows.clone())
1135 }
1136}
1137
1138impl BigQueryAdapter for InMemoryBigQueryAdapter {}
1139
1140#[derive(Debug, Clone, Default)]
1141pub struct InMemoryOpenSearchAdapter {
1142 rows: Vec<StoredRow>,
1143}
1144
1145impl InMemoryOpenSearchAdapter {
1146 pub fn new(rows: Vec<StoredRow>) -> Self {
1147 Self { rows }
1148 }
1149}
1150
1151impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
1152 type Request = SearchRequest;
1153 type Response = SearchResponse;
1154
1155 fn execute(
1156 &self,
1157 request: &Self::Request,
1158 _context: &QueryContext,
1159 ) -> IntegrationResult<Self::Response> {
1160 if request.index.trim().is_empty() {
1161 return Err(IntegrationError::new(
1162 "opensearch",
1163 IntegrationErrorKind::InvalidInput,
1164 "search index must not be empty",
1165 )
1166 .with_code("empty_index"));
1167 }
1168 let needle = request.text.trim().to_lowercase();
1169 let mut filtered = self
1170 .rows
1171 .iter()
1172 .filter(|row| {
1173 if needle.is_empty() {
1174 return true;
1175 }
1176 row.data.values().any(|value| {
1177 value
1178 .as_str()
1179 .map(|text| text.to_lowercase().contains(&needle))
1180 .unwrap_or(false)
1181 })
1182 })
1183 .cloned()
1184 .collect::<Vec<_>>();
1185 let total_hits = filtered.len();
1186 let page = request.page.max(1);
1187 let per_page = request.per_page.max(1);
1188 let offset = (page - 1) * per_page;
1189 filtered = filtered.into_iter().skip(offset).take(per_page).collect();
1190 Ok(SearchResponse {
1191 total_hits,
1192 rows: filtered,
1193 })
1194 }
1195}
1196
1197impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
1198
1199#[derive(Debug, Default, Clone)]
1200pub struct InMemoryAxiomSink {
1201 events: Arc<Mutex<Vec<AnalyticsEvent>>>,
1202}
1203
1204impl InMemoryAxiomSink {
1205 pub fn events(&self) -> Vec<AnalyticsEvent> {
1206 self.events
1207 .lock()
1208 .map(|events| events.clone())
1209 .unwrap_or_default()
1210 }
1211}
1212
1213impl AnalyticsSink for InMemoryAxiomSink {
1214 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
1215 let started_at = Instant::now();
1216 let namespace = event.namespace.clone();
1217 let name = event.name.clone();
1218 self.events
1219 .lock()
1220 .map_err(|_| {
1221 IntegrationError::new(
1222 "axiom",
1223 IntegrationErrorKind::Unavailable,
1224 "analytics sink lock poisoned",
1225 )
1226 })?
1227 .push(event);
1228 info!(
1229 target: "shelly.integration.query",
1230 source = "axiom",
1231 operation = "send_event",
1232 namespace,
1233 event_name = name,
1234 duration_ms = started_at.elapsed().as_millis() as u64,
1235 "Shelly integration call executed"
1236 );
1237 Ok(())
1238 }
1239}
1240
1241#[derive(Default)]
1242pub struct InMemoryJobOrchestrator {
1243 statuses: Mutex<HashMap<String, JobStatus>>,
1244 callbacks: Mutex<Vec<JobCompletionCallback>>,
1245 next_id: Mutex<u64>,
1246}
1247
1248impl InMemoryJobOrchestrator {
1249 pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
1250 let status = self.with_status_mut(id, |status| {
1251 status.state = JobState::Succeeded;
1252 status.result = Some(result);
1253 status.error = None;
1254 })?;
1255 self.notify(&status);
1256 Ok(())
1257 }
1258
1259 pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
1260 let status = self.with_status_mut(id, |status| {
1261 status.state = JobState::Failed;
1262 status.result = None;
1263 status.error = Some(error);
1264 })?;
1265 self.notify(&status);
1266 Ok(())
1267 }
1268
1269 fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
1270 where
1271 F: FnOnce(&mut JobStatus),
1272 {
1273 let mut statuses = self.statuses.lock().map_err(|_| {
1274 IntegrationError::new(
1275 "trigger",
1276 IntegrationErrorKind::Unavailable,
1277 "job status lock poisoned",
1278 )
1279 })?;
1280 let Some(status) = statuses.get_mut(id) else {
1281 return Err(IntegrationError::new(
1282 "trigger",
1283 IntegrationErrorKind::InvalidInput,
1284 "job not found",
1285 )
1286 .with_code("job_not_found"));
1287 };
1288 status.attempts = status.attempts.saturating_add(1);
1289 update(status);
1290 Ok(status.clone())
1291 }
1292
1293 fn notify(&self, status: &JobStatus) {
1294 if let Ok(callbacks) = self.callbacks.lock() {
1295 for callback in callbacks.iter() {
1296 callback(status);
1297 }
1298 }
1299 }
1300
1301 fn is_terminal(state: JobState) -> bool {
1302 matches!(
1303 state,
1304 JobState::Succeeded | JobState::Failed | JobState::Canceled
1305 )
1306 }
1307}
1308
1309impl JobOrchestrator for InMemoryJobOrchestrator {
1310 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
1311 let started_at = Instant::now();
1312 let workflow = request.workflow.clone();
1313 let idempotency_key = request.idempotency_key.clone();
1314 let mut next_id = self.next_id.lock().map_err(|_| {
1315 IntegrationError::new(
1316 "trigger",
1317 IntegrationErrorKind::Unavailable,
1318 "job id lock poisoned",
1319 )
1320 })?;
1321 *next_id = next_id.saturating_add(1);
1322 let id = format!("job-{next_id}");
1323 let status = JobStatus {
1324 id: id.clone(),
1325 state: JobState::Queued,
1326 attempts: 0,
1327 result: None,
1328 error: None,
1329 };
1330 self.statuses
1331 .lock()
1332 .map_err(|_| {
1333 IntegrationError::new(
1334 "trigger",
1335 IntegrationErrorKind::Unavailable,
1336 "job status lock poisoned",
1337 )
1338 })?
1339 .insert(id.clone(), status);
1340 let handle = JobHandle {
1341 id,
1342 workflow: request.workflow,
1343 idempotency_key: request.idempotency_key,
1344 };
1345 info!(
1346 target: "shelly.integration.query",
1347 source = "trigger",
1348 operation = "enqueue",
1349 workflow,
1350 idempotency_key,
1351 job_id = handle.id.as_str(),
1352 duration_ms = started_at.elapsed().as_millis() as u64,
1353 "Shelly integration call executed"
1354 );
1355 Ok(handle)
1356 }
1357
1358 fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
1359 let started_at = Instant::now();
1360 let result = self
1361 .statuses
1362 .lock()
1363 .map_err(|_| {
1364 IntegrationError::new(
1365 "trigger",
1366 IntegrationErrorKind::Unavailable,
1367 "job status lock poisoned",
1368 )
1369 })?
1370 .get(id)
1371 .cloned()
1372 .ok_or_else(|| {
1373 IntegrationError::new(
1374 "trigger",
1375 IntegrationErrorKind::InvalidInput,
1376 "job not found",
1377 )
1378 .with_code("job_not_found")
1379 });
1380 match &result {
1381 Ok(status) => info!(
1382 target: "shelly.integration.query",
1383 source = "trigger",
1384 operation = "status",
1385 job_id = id,
1386 job_state = ?status.state,
1387 attempts = status.attempts,
1388 duration_ms = started_at.elapsed().as_millis() as u64,
1389 "Shelly integration call executed"
1390 ),
1391 Err(err) => warn!(
1392 target: "shelly.integration.query",
1393 source = "trigger",
1394 operation = "status",
1395 job_id = id,
1396 duration_ms = started_at.elapsed().as_millis() as u64,
1397 error = %err,
1398 "Shelly integration call failed"
1399 ),
1400 }
1401 result
1402 }
1403
1404 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
1405 let started_at = Instant::now();
1406 let attempts = attempts.max(1);
1407 let mut polled = 0u32;
1408 for current in 1..=attempts {
1409 let status = self.status(id)?;
1410 polled = current;
1411 if Self::is_terminal(status.state) || current == attempts {
1412 info!(
1413 target: "shelly.integration.query",
1414 source = "trigger",
1415 operation = "poll",
1416 job_id = id,
1417 attempts = polled,
1418 backoff_ms,
1419 terminal = Self::is_terminal(status.state),
1420 job_state = ?status.state,
1421 duration_ms = started_at.elapsed().as_millis() as u64,
1422 "Shelly integration call executed"
1423 );
1424 return Ok(status);
1425 }
1426 if backoff_ms > 0 {
1427 thread::sleep(Duration::from_millis(backoff_ms));
1428 }
1429 }
1430
1431 let result = self.status(id);
1432 match &result {
1433 Ok(status) => info!(
1434 target: "shelly.integration.query",
1435 source = "trigger",
1436 operation = "poll",
1437 job_id = id,
1438 attempts = polled,
1439 backoff_ms,
1440 terminal = Self::is_terminal(status.state),
1441 job_state = ?status.state,
1442 duration_ms = started_at.elapsed().as_millis() as u64,
1443 "Shelly integration call executed"
1444 ),
1445 Err(err) => warn!(
1446 target: "shelly.integration.query",
1447 source = "trigger",
1448 operation = "poll",
1449 job_id = id,
1450 attempts = polled,
1451 backoff_ms,
1452 duration_ms = started_at.elapsed().as_millis() as u64,
1453 error = %err,
1454 "Shelly integration call failed"
1455 ),
1456 }
1457 result
1458 }
1459
1460 fn register_completion_callback(&self, callback: JobCompletionCallback) {
1461 if let Ok(mut callbacks) = self.callbacks.lock() {
1462 callbacks.push(callback);
1463 info!(
1464 target: "shelly.integration.query",
1465 source = "trigger",
1466 operation = "register_completion_callback",
1467 callback_count = callbacks.len(),
1468 "Shelly integration callback registered"
1469 );
1470 } else {
1471 warn!(
1472 target: "shelly.integration.query",
1473 source = "trigger",
1474 operation = "register_completion_callback",
1475 "Shelly integration callback registration failed"
1476 );
1477 }
1478 }
1479}
1480
1481#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1482pub struct AdapterConformanceCheck {
1483 pub name: String,
1484 pub passed: bool,
1485 pub detail: String,
1486}
1487
1488#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
1489pub struct AdapterConformanceReport {
1490 pub checks: Vec<AdapterConformanceCheck>,
1491}
1492
1493impl AdapterConformanceReport {
1494 pub fn passed(&self) -> bool {
1495 self.checks.iter().all(|check| check.passed)
1496 }
1497
1498 pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1499 self.checks.push(AdapterConformanceCheck {
1500 name: name.into(),
1501 passed: true,
1502 detail: detail.into(),
1503 });
1504 }
1505
1506 pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1507 self.checks.push(AdapterConformanceCheck {
1508 name: name.into(),
1509 passed: false,
1510 detail: detail.into(),
1511 });
1512 }
1513}
1514
1515pub fn run_adapter_conformance_suite(
1516 singlestore: &dyn SingleStoreAdapter,
1517 clickhouse: &dyn ClickHouseAdapter,
1518 bigquery: &dyn BigQueryAdapter,
1519 opensearch: &dyn OpenSearchAdapter,
1520 trigger: &dyn TriggerDevAdapter,
1521 analytics: &dyn AnalyticsSink,
1522 context: &QueryContext,
1523) -> AdapterConformanceReport {
1524 let mut report = AdapterConformanceReport::default();
1525
1526 match singlestore.run_query(
1527 SqlCommand::new(
1528 "SELECT tenant, active_accounts FROM tenant_rollup",
1529 Vec::new(),
1530 ),
1531 context,
1532 ) {
1533 Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
1534 Err(err) => report.add_fail("singlestore.query", err.to_string()),
1535 }
1536
1537 match clickhouse.run_query(
1538 SqlCommand::new("SELECT account, events FROM account_events", Vec::new()),
1539 context,
1540 ) {
1541 Ok(rows) => report.add_pass("clickhouse.query", format!("rows={}", rows.len())),
1542 Err(err) => report.add_fail("clickhouse.query", err.to_string()),
1543 }
1544
1545 match bigquery.run_query(
1546 SqlCommand::new("SELECT account, arr FROM analytics_rollup", Vec::new()),
1547 context,
1548 ) {
1549 Ok(rows) => report.add_pass("bigquery.query", format!("rows={}", rows.len())),
1550 Err(err) => report.add_fail("bigquery.query", err.to_string()),
1551 }
1552
1553 match opensearch.search(SearchRequest::new("accounts", ""), context) {
1554 Ok(response) => report.add_pass(
1555 "opensearch.search",
1556 format!(
1557 "rows={} total_hits={}",
1558 response.rows.len(),
1559 response.total_hits
1560 ),
1561 ),
1562 Err(err) => report.add_fail("opensearch.search", err.to_string()),
1563 }
1564
1565 match trigger.trigger_workflow(JobRequest::new(
1566 "adapter_conformance_probe",
1567 serde_json::json!({"probe": true}),
1568 "adapter-conformance-probe",
1569 )) {
1570 Ok(handle) => {
1571 report.add_pass("trigger.enqueue", format!("id={}", handle.id));
1572 match trigger.workflow_status(&handle.id) {
1573 Ok(status) => report.add_pass(
1574 "trigger.status",
1575 format!("state={:?} attempts={}", status.state, status.attempts),
1576 ),
1577 Err(err) => report.add_fail("trigger.status", err.to_string()),
1578 }
1579 }
1580 Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
1581 }
1582
1583 let analytics_event = AnalyticsEvent::new(
1584 "adapter_conformance",
1585 "probe",
1586 serde_json::json!({
1587 "trace_id": context.trace_id,
1588 "correlation_id": context.correlation_id(),
1589 "request_id": context.request_id(),
1590 }),
1591 );
1592 match analytics.send_event(analytics_event) {
1593 Ok(()) => report.add_pass("analytics.emit", "event accepted"),
1594 Err(err) => report.add_fail("analytics.emit", err.to_string()),
1595 }
1596
1597 let context_correlation_check =
1598 context.correlation_id().is_some() && context.request_id().is_some();
1599 if context_correlation_check {
1600 report.add_pass(
1601 "context.correlation",
1602 "correlation_id and request_id present",
1603 );
1604 } else {
1605 report.add_fail(
1606 "context.correlation",
1607 "missing correlation_id/request_id; set QueryContext::with_correlation_id + with_request_id",
1608 );
1609 }
1610
1611 let retry_override_context = context.clone().with_retry_policy(RetryPolicy {
1612 max_attempts: 2,
1613 initial_backoff_ms: 0,
1614 max_backoff_ms: 0,
1615 });
1616 let mut retry_attempts = 0u32;
1617 match run_with_contract(
1618 "conformance",
1619 "retry_override_probe",
1620 AdapterCallContract::default_query(),
1621 &retry_override_context,
1622 |attempt| {
1623 retry_attempts = attempt;
1624 if attempt == 1 {
1625 Err(IntegrationError::new(
1626 "conformance",
1627 IntegrationErrorKind::Transient,
1628 "transient probe failure",
1629 ))
1630 } else {
1631 Ok("ok")
1632 }
1633 },
1634 ) {
1635 Ok(_) if retry_attempts == 2 => report.add_pass(
1636 "contract.retry_override",
1637 "retry override honored with bounded attempts",
1638 ),
1639 Ok(_) => report.add_fail(
1640 "contract.retry_override",
1641 format!("unexpected attempts={retry_attempts} expected=2"),
1642 ),
1643 Err(err) => report.add_fail("contract.retry_override", err.to_string()),
1644 }
1645
1646 let timeout_context = context.clone().with_timeout_ms(1);
1647 match run_with_contract(
1648 "conformance",
1649 "timeout_probe",
1650 AdapterCallContract::default_query(),
1651 &timeout_context,
1652 |_| {
1653 thread::sleep(Duration::from_millis(5));
1654 Ok::<_, IntegrationError>("done")
1655 },
1656 ) {
1657 Err(err) if err.kind == IntegrationErrorKind::Timeout => {
1658 report.add_pass("contract.timeout", "timeout classification validated")
1659 }
1660 Err(err) => report.add_fail("contract.timeout", err.to_string()),
1661 Ok(_) => report.add_fail(
1662 "contract.timeout",
1663 "expected timeout but operation returned success",
1664 ),
1665 }
1666
1667 report
1668}
1669
1670pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
1671 DataError::Integration(format!("[{}] {}", source.into(), err))
1672}
1673
1674pub fn map_integration_result<T>(
1675 source: impl Into<String>,
1676 result: IntegrationResult<T>,
1677) -> DataResult<T> {
1678 result.map_err(|err| map_integration_error(source, err))
1679}
1680
1681pub fn query_from_search(request: &SearchRequest) -> Query {
1682 let mut query = Query::new().paginate(request.page, request.per_page);
1683 for filter in &request.filters {
1684 query = query.where_filter(filter.clone());
1685 }
1686 query
1687}
1688
1689fn materialize_window_response(
1690 source: &str,
1691 request: DataWindowRequest,
1692 rows: Vec<Row>,
1693 total_rows_hint: Option<usize>,
1694) -> DataWindowResponse {
1695 let total_rows = total_rows_hint.unwrap_or(rows.len());
1696 let offset = request.offset.min(total_rows);
1697 let limit = request.limit.max(1);
1698 let window_rows = rows
1699 .into_iter()
1700 .skip(offset)
1701 .take(limit)
1702 .collect::<Vec<_>>();
1703 let query_token = request
1704 .query_token
1705 .unwrap_or_else(|| format!("{source}:{}:{}", request.dataset, total_rows));
1706 let window_token = request.window_token.unwrap_or_else(|| {
1707 format!(
1708 "{query_token}:offset={offset}:limit={limit}:rows={}",
1709 window_rows.len()
1710 )
1711 });
1712 let compact_rows = (request.wire_format == WireFormatProfile::Compact)
1713 .then(|| encode_compact_rows(&window_rows));
1714
1715 DataWindowResponse {
1716 dataset: request.dataset,
1717 offset,
1718 limit,
1719 total_rows,
1720 query_token,
1721 window_token,
1722 rows: window_rows,
1723 compact_rows,
1724 }
1725}
1726
1727fn encode_compact_rows(rows: &[Row]) -> CompactRowsPayload {
1728 let mut columns: Vec<String> = Vec::new();
1729 for row in rows {
1730 for key in row.keys() {
1731 if !columns.iter().any(|column| column == key) {
1732 columns.push(key.clone());
1733 }
1734 }
1735 }
1736
1737 let encoded_rows = rows
1738 .iter()
1739 .map(|row| {
1740 columns
1741 .iter()
1742 .map(|column| row.get(column).cloned().unwrap_or(Value::Null))
1743 .collect::<Vec<_>>()
1744 })
1745 .collect::<Vec<_>>();
1746
1747 CompactRowsPayload {
1748 columns,
1749 rows: encoded_rows,
1750 }
1751}
1752
1753#[cfg(test)]
1754mod tests {
1755 use super::{
1756 map_integration_result, run_adapter_conformance_suite, run_with_contract,
1757 run_with_contract_async, run_with_retry, run_with_retry_async, AdapterCallContract,
1758 AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge, BigQueryAdapter, ClickHouseAdapter,
1759 ConnectionLifecycle, ConnectionLifecycleHook, DataWindowRequest, InMemoryAxiomSink,
1760 InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryJobOrchestrator,
1761 InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
1762 IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
1763 LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
1764 SearchResponse, SingleStoreAdapter, SqlCommand, TriggerDevAdapter, TypedQueryBoundary,
1765 CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID, CONTEXT_TAG_TIMEOUT_MS,
1766 };
1767 use serde_json::{json, Value};
1768 use std::sync::{Arc, Mutex};
1769
1770 struct CountingHook {
1771 connects: Arc<Mutex<u32>>,
1772 disconnects: Arc<Mutex<u32>>,
1773 }
1774
1775 impl ConnectionLifecycleHook for CountingHook {
1776 fn on_connect(&self) -> super::IntegrationResult<()> {
1777 let mut guard = self.connects.lock().unwrap();
1778 *guard += 1;
1779 Ok(())
1780 }
1781
1782 fn on_disconnect(&self) -> super::IntegrationResult<()> {
1783 let mut guard = self.disconnects.lock().unwrap();
1784 *guard += 1;
1785 Ok(())
1786 }
1787 }
1788
1789 #[test]
1790 fn lifecycle_hooks_are_called() {
1791 let connects = Arc::new(Mutex::new(0));
1792 let disconnects = Arc::new(Mutex::new(0));
1793 let mut lifecycle = LifecycleHooks::new();
1794 lifecycle.register_hook(Arc::new(CountingHook {
1795 connects: connects.clone(),
1796 disconnects: disconnects.clone(),
1797 }));
1798
1799 lifecycle.connect().unwrap();
1800 lifecycle.disconnect().unwrap();
1801 assert_eq!(*connects.lock().unwrap(), 1);
1802 assert_eq!(*disconnects.lock().unwrap(), 1);
1803 }
1804
1805 #[test]
1806 fn retry_policy_retries_transient_errors() {
1807 let mut calls = 0u32;
1808 let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
1809 calls = attempt;
1810 if attempt < 3 {
1811 Err(IntegrationError::new(
1812 "opensearch",
1813 IntegrationErrorKind::Transient,
1814 "temporary failure",
1815 ))
1816 } else {
1817 Ok("ok")
1818 }
1819 })
1820 .unwrap();
1821
1822 assert_eq!(result, "ok");
1823 assert_eq!(calls, 3);
1824 }
1825
1826 #[test]
1827 fn retry_policy_stops_on_permanent_errors() {
1828 let mut calls = 0u32;
1829 let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
1830 calls = attempt;
1831 Err::<(), IntegrationError>(IntegrationError::new(
1832 "singlestore",
1833 IntegrationErrorKind::Permanent,
1834 "invalid sql",
1835 ))
1836 })
1837 .unwrap_err();
1838
1839 assert_eq!(calls, 1);
1840 assert_eq!(err.kind, IntegrationErrorKind::Permanent);
1841 }
1842
1843 #[test]
1844 fn integration_result_maps_into_data_error() {
1845 let mapped = map_integration_result::<()>(
1846 "trigger",
1847 Err(IntegrationError::new(
1848 "trigger",
1849 IntegrationErrorKind::Unavailable,
1850 "service unavailable",
1851 )),
1852 )
1853 .unwrap_err();
1854
1855 assert!(mapped.to_string().contains("service unavailable"));
1856 }
1857
1858 #[derive(Default)]
1859 struct InMemoryJobs {
1860 statuses: Arc<Mutex<Vec<JobStatus>>>,
1861 callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
1862 }
1863
1864 impl JobOrchestrator for InMemoryJobs {
1865 fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
1866 let id = format!("job-{}", request.idempotency_key);
1867 self.statuses.lock().unwrap().push(JobStatus {
1868 id: id.clone(),
1869 state: JobState::Queued,
1870 attempts: 0,
1871 result: None,
1872 error: None,
1873 });
1874 Ok(JobHandle {
1875 id,
1876 workflow: request.workflow,
1877 idempotency_key: request.idempotency_key,
1878 })
1879 }
1880
1881 fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
1882 self.statuses
1883 .lock()
1884 .unwrap()
1885 .iter()
1886 .find(|status| status.id == id)
1887 .cloned()
1888 .ok_or_else(|| {
1889 IntegrationError::new(
1890 "trigger",
1891 IntegrationErrorKind::InvalidInput,
1892 "job not found",
1893 )
1894 })
1895 }
1896
1897 fn poll(
1898 &self,
1899 id: &str,
1900 _attempts: u32,
1901 _backoff_ms: u64,
1902 ) -> super::IntegrationResult<JobStatus> {
1903 self.status(id)
1904 }
1905
1906 fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
1907 self.callbacks.lock().unwrap().push(callback);
1908 }
1909 }
1910
1911 #[test]
1912 fn job_orchestration_contract_supports_enqueue_and_status() {
1913 let jobs = InMemoryJobs::default();
1914 let handle = jobs
1915 .enqueue(JobRequest::new(
1916 "sync_customer",
1917 json!({"id": 42}),
1918 "idempotent-42",
1919 ))
1920 .unwrap();
1921 let status = jobs.status(&handle.id).unwrap();
1922 assert_eq!(status.state, JobState::Queued);
1923 assert_eq!(handle.idempotency_key, "idempotent-42");
1924
1925 let ctx = QueryContext::default().with_tenant_id("tenant-a");
1926 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
1927 }
1928
1929 #[test]
1930 fn reference_singlestore_adapter_runs_typed_sql_boundary() {
1931 let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
1932 "region".to_string(),
1933 Value::String("EMEA".to_string()),
1934 )])]);
1935 let rows = adapter
1936 .run_query(
1937 SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1938 &QueryContext::default(),
1939 )
1940 .unwrap();
1941 assert_eq!(rows.len(), 1);
1942 assert_eq!(
1943 rows[0].get("region"),
1944 Some(&Value::String("EMEA".to_string()))
1945 );
1946 }
1947
1948 #[test]
1949 fn reference_clickhouse_adapter_runs_typed_sql_boundary() {
1950 let adapter = InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
1951 "events".to_string(),
1952 Value::Number(128.into()),
1953 )])]);
1954 let rows = adapter
1955 .run_query(
1956 SqlCommand::new("SELECT events FROM account_events", Vec::new()),
1957 &QueryContext::default(),
1958 )
1959 .unwrap();
1960 assert_eq!(rows.len(), 1);
1961 assert_eq!(rows[0].get("events"), Some(&Value::Number(128.into())));
1962 }
1963
1964 #[test]
1965 fn reference_bigquery_adapter_runs_typed_sql_boundary() {
1966 let adapter = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
1967 "active_accounts".to_string(),
1968 Value::Number(64.into()),
1969 )])]);
1970 let rows = adapter
1971 .run_query(
1972 SqlCommand::new("SELECT active_accounts FROM analytics_rollup", Vec::new()),
1973 &QueryContext::default(),
1974 )
1975 .unwrap();
1976 assert_eq!(rows.len(), 1);
1977 assert_eq!(
1978 rows[0].get("active_accounts"),
1979 Some(&Value::Number(64.into()))
1980 );
1981 }
1982
1983 #[test]
1984 fn reference_opensearch_adapter_filters_rows() {
1985 let rows = vec![
1986 crate::StoredRow {
1987 id: 1,
1988 data: std::collections::BTreeMap::from([(
1989 "title".to_string(),
1990 Value::String("Acme renewal".to_string()),
1991 )]),
1992 },
1993 crate::StoredRow {
1994 id: 2,
1995 data: std::collections::BTreeMap::from([(
1996 "title".to_string(),
1997 Value::String("Globex onboarding".to_string()),
1998 )]),
1999 },
2000 ];
2001 let adapter = InMemoryOpenSearchAdapter::new(rows);
2002 let response = adapter
2003 .search(
2004 SearchRequest::new("accounts", "renewal"),
2005 &QueryContext::default(),
2006 )
2007 .unwrap();
2008 assert_eq!(response.total_hits, 1);
2009 assert_eq!(response.rows[0].id, 1);
2010 }
2011
2012 #[test]
2013 fn high_volume_window_query_profiles_support_compact_payloads() {
2014 let sql_rows = vec![
2015 std::collections::BTreeMap::from([
2016 ("tenant".to_string(), Value::String("north".to_string())),
2017 ("arr".to_string(), Value::Number(120.into())),
2018 ]),
2019 std::collections::BTreeMap::from([
2020 ("tenant".to_string(), Value::String("south".to_string())),
2021 ("arr".to_string(), Value::Number(210.into())),
2022 ]),
2023 std::collections::BTreeMap::from([
2024 ("tenant".to_string(), Value::String("west".to_string())),
2025 ("arr".to_string(), Value::Number(330.into())),
2026 ]),
2027 std::collections::BTreeMap::from([
2028 ("tenant".to_string(), Value::String("east".to_string())),
2029 ("arr".to_string(), Value::Number(480.into())),
2030 ]),
2031 ];
2032 let context = QueryContext::default();
2033 let request = DataWindowRequest::new("tenant_rollup", 1, 2)
2034 .with_query_token("q:m51")
2035 .compact();
2036
2037 let singlestore = InMemorySingleStoreAdapter::new(sql_rows.clone());
2038 let clickhouse = InMemoryClickHouseAdapter::new(sql_rows.clone());
2039 let bigquery = InMemoryBigQueryAdapter::new(sql_rows.clone());
2040
2041 for response in [
2042 singlestore
2043 .run_high_volume_window_query(
2044 SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2045 request.clone(),
2046 &context,
2047 )
2048 .expect("singlestore window query"),
2049 clickhouse
2050 .run_high_volume_window_query(
2051 SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2052 request.clone(),
2053 &context,
2054 )
2055 .expect("clickhouse window query"),
2056 bigquery
2057 .run_high_volume_window_query(
2058 SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2059 request.clone(),
2060 &context,
2061 )
2062 .expect("bigquery window query"),
2063 ] {
2064 assert_eq!(response.dataset, "tenant_rollup");
2065 assert_eq!(response.rows.len(), 2);
2066 assert_eq!(response.total_rows, 4);
2067 assert_eq!(response.query_token, "q:m51");
2068 let compact = response.compact_rows.expect("compact payload");
2069 assert!(compact.columns.contains(&"tenant".to_string()));
2070 assert!(compact.columns.contains(&"arr".to_string()));
2071 assert_eq!(compact.rows.len(), response.rows.len());
2072 }
2073 }
2074
2075 #[test]
2076 fn opensearch_window_search_uses_window_contract() {
2077 let rows = vec![
2078 crate::StoredRow {
2079 id: 1,
2080 data: std::collections::BTreeMap::from([
2081 ("tenant".to_string(), Value::String("acme".to_string())),
2082 ("status".to_string(), Value::String("healthy".to_string())),
2083 ]),
2084 },
2085 crate::StoredRow {
2086 id: 2,
2087 data: std::collections::BTreeMap::from([
2088 ("tenant".to_string(), Value::String("globex".to_string())),
2089 ("status".to_string(), Value::String("renewal".to_string())),
2090 ]),
2091 },
2092 crate::StoredRow {
2093 id: 3,
2094 data: std::collections::BTreeMap::from([
2095 ("tenant".to_string(), Value::String("initech".to_string())),
2096 ("status".to_string(), Value::String("at-risk".to_string())),
2097 ]),
2098 },
2099 ];
2100 let adapter = InMemoryOpenSearchAdapter::new(rows);
2101 let response = adapter
2102 .search_window(
2103 SearchRequest::new("accounts", ""),
2104 DataWindowRequest::new("accounts", 1, 2).compact(),
2105 &QueryContext::default(),
2106 )
2107 .expect("opensearch window search");
2108
2109 assert_eq!(response.dataset, "accounts");
2110 assert_eq!(response.offset, 1);
2111 assert_eq!(response.limit, 2);
2112 assert_eq!(response.total_rows, 3);
2113 assert_eq!(response.rows.len(), 2);
2114 assert!(response.compact_rows.is_some());
2115 assert!(!response.window_token.is_empty());
2116 }
2117
2118 #[test]
2119 fn reference_axiom_sink_records_events() {
2120 let sink = InMemoryAxiomSink::default();
2121 sink.send_event(AnalyticsEvent::new(
2122 "sales",
2123 "query_executed",
2124 json!({"latency_ms": 12}),
2125 ))
2126 .unwrap();
2127 let events = sink.events();
2128 assert_eq!(events.len(), 1);
2129 assert_eq!(events[0].name, "query_executed");
2130 }
2131
2132 #[test]
2133 fn reference_trigger_orchestrator_supports_completion_and_polling() {
2134 let orchestrator = InMemoryJobOrchestrator::default();
2135 let completed = Arc::new(Mutex::new(false));
2136 let completed_flag = completed.clone();
2137 orchestrator.register_completion_callback(Arc::new(move |status| {
2138 if status.state == JobState::Succeeded {
2139 if let Ok(mut guard) = completed_flag.lock() {
2140 *guard = true;
2141 }
2142 }
2143 }));
2144
2145 let handle = orchestrator
2146 .enqueue(JobRequest::new(
2147 "refresh_dashboard",
2148 json!({"account_id": 7}),
2149 "refresh-7",
2150 ))
2151 .unwrap();
2152 orchestrator
2153 .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
2154 .unwrap();
2155 let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
2156 assert_eq!(status.state, JobState::Succeeded);
2157 assert_eq!(
2158 status.result,
2159 Some(json!({
2160 "rows_synced": 18
2161 }))
2162 );
2163 assert!(*completed.lock().unwrap());
2164 }
2165
2166 #[test]
2167 fn query_context_helpers_encode_trace_correlation_retry_timeout() {
2168 let context = QueryContext::default()
2169 .with_correlation_id("corr-42")
2170 .with_request_id("req-42")
2171 .with_timeout_ms(900)
2172 .with_retry_policy(RetryPolicy {
2173 max_attempts: 4,
2174 initial_backoff_ms: 25,
2175 max_backoff_ms: 100,
2176 });
2177
2178 assert_eq!(
2179 context
2180 .tags
2181 .get(CONTEXT_TAG_CORRELATION_ID)
2182 .map(String::as_str),
2183 Some("corr-42")
2184 );
2185 assert_eq!(
2186 context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
2187 Some("req-42")
2188 );
2189 assert_eq!(
2190 context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
2191 Some("900")
2192 );
2193 assert_eq!(context.correlation_id(), Some("corr-42"));
2194 assert_eq!(context.request_id(), Some("req-42"));
2195 assert_eq!(context.timeout_ms(), Some(900));
2196 assert_eq!(
2197 context.retry_policy_override(),
2198 Some(RetryPolicy {
2199 max_attempts: 4,
2200 initial_backoff_ms: 25,
2201 max_backoff_ms: 100
2202 })
2203 );
2204 }
2205
2206 #[test]
2207 fn run_with_contract_respects_context_retry_override() {
2208 let context = QueryContext::default().with_retry_policy(RetryPolicy {
2209 max_attempts: 2,
2210 initial_backoff_ms: 0,
2211 max_backoff_ms: 0,
2212 });
2213 let mut attempts = 0u32;
2214 let result = run_with_contract(
2215 "opensearch",
2216 "search",
2217 AdapterCallContract::default()
2218 .with_retry_policy(RetryPolicy::never())
2219 .with_timeout_ms(1_000),
2220 &context,
2221 |attempt| {
2222 attempts = attempt;
2223 if attempt == 1 {
2224 Err(IntegrationError::new(
2225 "opensearch",
2226 IntegrationErrorKind::Transient,
2227 "temporary failure",
2228 ))
2229 } else {
2230 Ok("ok")
2231 }
2232 },
2233 )
2234 .unwrap();
2235 assert_eq!(result, "ok");
2236 assert_eq!(attempts, 2);
2237 }
2238
2239 #[tokio::test]
2240 async fn run_with_contract_async_respects_context_retry_override() {
2241 let context = QueryContext::default().with_retry_policy(RetryPolicy {
2242 max_attempts: 2,
2243 initial_backoff_ms: 0,
2244 max_backoff_ms: 0,
2245 });
2246 let attempts = Arc::new(Mutex::new(0u32));
2247 let attempts_for_closure = attempts.clone();
2248 let result = run_with_contract_async(
2249 "opensearch",
2250 "search_async",
2251 AdapterCallContract::default()
2252 .with_retry_policy(RetryPolicy::never())
2253 .with_timeout_ms(1_000),
2254 &context,
2255 move |attempt| {
2256 let attempts_for_step = attempts_for_closure.clone();
2257 async move {
2258 if let Ok(mut guard) = attempts_for_step.lock() {
2259 *guard = attempt;
2260 }
2261 if attempt == 1 {
2262 Err(IntegrationError::new(
2263 "opensearch",
2264 IntegrationErrorKind::Transient,
2265 "temporary failure",
2266 ))
2267 } else {
2268 Ok("ok")
2269 }
2270 }
2271 },
2272 )
2273 .await
2274 .unwrap();
2275 assert_eq!(result, "ok");
2276 assert_eq!(*attempts.lock().unwrap(), 2);
2277 }
2278
2279 #[tokio::test]
2280 async fn run_with_retry_async_retries_transient_errors() {
2281 let calls = Arc::new(Mutex::new(0u32));
2282 let calls_for_closure = calls.clone();
2283 let result = run_with_retry_async(RetryPolicy::conservative(), move |attempt| {
2284 let calls_for_step = calls_for_closure.clone();
2285 async move {
2286 if let Ok(mut guard) = calls_for_step.lock() {
2287 *guard = attempt;
2288 }
2289 if attempt < 3 {
2290 Err(IntegrationError::new(
2291 "bigquery",
2292 IntegrationErrorKind::Transient,
2293 "temporary failure",
2294 ))
2295 } else {
2296 Ok("ok")
2297 }
2298 }
2299 })
2300 .await
2301 .unwrap();
2302 assert_eq!(result, "ok");
2303 assert_eq!(*calls.lock().unwrap(), 3);
2304 }
2305
2306 #[test]
2307 fn run_with_contract_returns_timeout_error() {
2308 let context = QueryContext::default().with_timeout_ms(5);
2309 let err = run_with_contract(
2310 "singlestore",
2311 "run_query",
2312 AdapterCallContract::default(),
2313 &context,
2314 |_| {
2315 std::thread::sleep(std::time::Duration::from_millis(15));
2316 Ok::<_, IntegrationError>("done")
2317 },
2318 )
2319 .unwrap_err();
2320 assert_eq!(err.kind, IntegrationErrorKind::Timeout);
2321 assert_eq!(err.code.as_deref(), Some("operation_timeout"));
2322 }
2323
2324 #[test]
2325 fn axiom_bridge_enriches_events_with_trace_and_correlation() {
2326 let sink = Arc::new(InMemoryAxiomSink::default());
2327 let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
2328 let context = QueryContext {
2329 tenant_id: Some("tenant-a".to_string()),
2330 trace_id: Some("trace-123".to_string()),
2331 tags: std::collections::BTreeMap::new(),
2332 }
2333 .with_correlation_id("corr-9")
2334 .with_request_id("req-9");
2335 bridge
2336 .emit("session_event", json!({"event":"patch"}), &context)
2337 .unwrap();
2338 let events = sink.events();
2339 assert_eq!(events.len(), 1);
2340 let payload = events[0].payload.as_object().unwrap();
2341 assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
2342 assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
2343 assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
2344 }
2345
2346 #[test]
2347 fn conformance_suite_passes_with_reference_adapters() {
2348 let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
2349 [("tenant".to_string(), Value::String("north".to_string()))],
2350 )]);
2351 let clickhouse =
2352 InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
2353 "events".to_string(),
2354 Value::Number(99.into()),
2355 )])]);
2356 let bigquery = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
2357 "accounts".to_string(),
2358 Value::Number(33.into()),
2359 )])]);
2360 let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
2361 id: 1,
2362 data: std::collections::BTreeMap::from([(
2363 "title".to_string(),
2364 Value::String("Acme renewal".to_string()),
2365 )]),
2366 }]);
2367 let trigger = InMemoryJobOrchestrator::default();
2368 let analytics = InMemoryAxiomSink::default();
2369 let report = run_adapter_conformance_suite(
2370 &singlestore,
2371 &clickhouse,
2372 &bigquery,
2373 &opensearch,
2374 &trigger,
2375 &analytics,
2376 &QueryContext::default()
2377 .with_correlation_id("corr-1")
2378 .with_request_id("req-1"),
2379 );
2380 assert!(report.passed(), "report={report:?}");
2381 assert!(report.checks.len() >= 8);
2382 }
2383
2384 #[test]
2385 fn adapter_error_paths_cover_invalid_inputs_and_failure_reporting() {
2386 let context = QueryContext::default();
2387 let empty_sql = SqlCommand::new(" ", Vec::new());
2388 let singlestore = InMemorySingleStoreAdapter::default();
2389 let clickhouse = InMemoryClickHouseAdapter::default();
2390 let bigquery = InMemoryBigQueryAdapter::default();
2391 let opensearch = InMemoryOpenSearchAdapter::default();
2392
2393 for err in [
2394 singlestore
2395 .run_query(empty_sql.clone(), &context)
2396 .unwrap_err(),
2397 clickhouse
2398 .run_query(empty_sql.clone(), &context)
2399 .unwrap_err(),
2400 bigquery.run_query(empty_sql, &context).unwrap_err(),
2401 ] {
2402 assert_eq!(err.kind, IntegrationErrorKind::InvalidInput);
2403 assert_eq!(err.code.as_deref(), Some("empty_statement"));
2404 }
2405
2406 let search_err = opensearch
2407 .search(SearchRequest::new(" ", "needle"), &context)
2408 .unwrap_err();
2409 assert_eq!(search_err.kind, IntegrationErrorKind::InvalidInput);
2410 assert_eq!(search_err.code.as_deref(), Some("empty_index"));
2411 }
2412
2413 struct FailingSqlAdapter {
2414 source: &'static str,
2415 }
2416
2417 impl TypedQueryBoundary for FailingSqlAdapter {
2418 type Request = SqlCommand;
2419 type Response = Vec<crate::Row>;
2420
2421 fn execute(
2422 &self,
2423 _request: &Self::Request,
2424 _context: &QueryContext,
2425 ) -> super::IntegrationResult<Self::Response> {
2426 Err(IntegrationError::new(
2427 self.source,
2428 IntegrationErrorKind::Unavailable,
2429 "backend unavailable",
2430 )
2431 .with_code("backend_down"))
2432 }
2433 }
2434
2435 impl SingleStoreAdapter for FailingSqlAdapter {}
2436 impl ClickHouseAdapter for FailingSqlAdapter {}
2437 impl BigQueryAdapter for FailingSqlAdapter {}
2438
2439 struct FailingSearchAdapter;
2440
2441 impl TypedQueryBoundary for FailingSearchAdapter {
2442 type Request = SearchRequest;
2443 type Response = SearchResponse;
2444
2445 fn execute(
2446 &self,
2447 _request: &Self::Request,
2448 _context: &QueryContext,
2449 ) -> super::IntegrationResult<Self::Response> {
2450 Err(IntegrationError::new(
2451 "opensearch",
2452 IntegrationErrorKind::Unavailable,
2453 "search unavailable",
2454 ))
2455 }
2456 }
2457
2458 impl OpenSearchAdapter for FailingSearchAdapter {}
2459
2460 struct FailingTrigger;
2461
2462 impl TriggerDevAdapter for FailingTrigger {
2463 fn trigger_workflow(&self, _request: JobRequest) -> super::IntegrationResult<JobHandle> {
2464 Err(IntegrationError::new(
2465 "trigger",
2466 IntegrationErrorKind::Unavailable,
2467 "trigger unavailable",
2468 ))
2469 }
2470
2471 fn workflow_status(&self, _id: &str) -> super::IntegrationResult<JobStatus> {
2472 Err(IntegrationError::new(
2473 "trigger",
2474 IntegrationErrorKind::Unavailable,
2475 "trigger unavailable",
2476 ))
2477 }
2478
2479 fn poll_workflow(
2480 &self,
2481 _id: &str,
2482 _attempts: u32,
2483 _backoff_ms: u64,
2484 ) -> super::IntegrationResult<JobStatus> {
2485 Err(IntegrationError::new(
2486 "trigger",
2487 IntegrationErrorKind::Unavailable,
2488 "trigger unavailable",
2489 ))
2490 }
2491 }
2492
2493 #[derive(Default)]
2494 struct FailingAnalyticsSink;
2495
2496 impl AnalyticsSink for FailingAnalyticsSink {
2497 fn send_event(&self, _event: AnalyticsEvent) -> super::IntegrationResult<()> {
2498 Err(IntegrationError::new(
2499 "analytics",
2500 IntegrationErrorKind::Unavailable,
2501 "sink unavailable",
2502 ))
2503 }
2504 }
2505
2506 #[test]
2507 fn conformance_suite_captures_failures_for_unavailable_dependencies() {
2508 let failing_sql = FailingSqlAdapter { source: "sql" };
2509 let failing_search = FailingSearchAdapter;
2510 let failing_trigger = FailingTrigger;
2511 let failing_analytics = FailingAnalyticsSink;
2512
2513 let report = run_adapter_conformance_suite(
2514 &failing_sql,
2515 &failing_sql,
2516 &failing_sql,
2517 &failing_search,
2518 &failing_trigger,
2519 &failing_analytics,
2520 &QueryContext::default(),
2521 );
2522
2523 assert!(!report.passed());
2524 assert!(report
2525 .checks
2526 .iter()
2527 .any(|check| { check.name == "singlestore.query" && !check.passed }));
2528 assert!(report
2529 .checks
2530 .iter()
2531 .any(|check| { check.name == "opensearch.search" && !check.passed }));
2532 assert!(report
2533 .checks
2534 .iter()
2535 .any(|check| { check.name == "trigger.enqueue" && !check.passed }));
2536 assert!(report
2537 .checks
2538 .iter()
2539 .any(|check| { check.name == "analytics.emit" && !check.passed }));
2540 assert!(report
2541 .checks
2542 .iter()
2543 .any(|check| { check.name == "context.correlation" && !check.passed }));
2544 }
2545}