1use crate::{
2 error::{DataError, DataResult},
3 query::{Query, WireFormatProfile},
4 repo::{CompactRowsPayload, Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt,
11 future::Future,
12 sync::{Arc, Mutex},
13 thread,
14 time::{Duration, Instant},
15};
16use tokio::time::sleep;
17use tracing::{info, warn};
18
19pub const CONTEXT_TAG_CORRELATION_ID: &str = "correlation_id";
20pub const CONTEXT_TAG_REQUEST_ID: &str = "request_id";
21pub const CONTEXT_TAG_TIMEOUT_MS: &str = "timeout_ms";
22pub const CONTEXT_TAG_RETRY_MAX_ATTEMPTS: &str = "retry_max_attempts";
23pub const CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS: &str = "retry_initial_backoff_ms";
24pub const CONTEXT_TAG_RETRY_MAX_BACKOFF_MS: &str = "retry_max_backoff_ms";
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum IntegrationErrorKind {
29 Transient,
30 Permanent,
31 Auth,
32 RateLimited,
33 Timeout,
34 Unavailable,
35 InvalidInput,
36}
37
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39pub struct IntegrationError {
40 pub source: String,
41 pub kind: IntegrationErrorKind,
42 pub message: String,
43 pub code: Option<String>,
44 pub retryable: bool,
45}
46
47impl IntegrationError {
48 pub fn new(
49 source: impl Into<String>,
50 kind: IntegrationErrorKind,
51 message: impl Into<String>,
52 ) -> Self {
53 let kind_value = kind;
54 Self {
55 source: source.into(),
56 kind: kind_value,
57 message: message.into(),
58 code: None,
59 retryable: matches!(
60 kind_value,
61 IntegrationErrorKind::Transient
62 | IntegrationErrorKind::RateLimited
63 | IntegrationErrorKind::Timeout
64 | IntegrationErrorKind::Unavailable
65 ),
66 }
67 }
68
69 pub fn with_code(mut self, code: impl Into<String>) -> Self {
70 self.code = Some(code.into());
71 self
72 }
73}
74
75impl fmt::Display for IntegrationError {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 if let Some(code) = &self.code {
78 write!(f, "[{}:{}] {}", self.source, code, self.message)
79 } else {
80 write!(f, "[{}] {}", self.source, self.message)
81 }
82 }
83}
84
85impl std::error::Error for IntegrationError {}
86
87pub type IntegrationResult<T> = Result<T, IntegrationError>;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90pub struct RetryPolicy {
91 pub max_attempts: u32,
92 pub initial_backoff_ms: u64,
93 pub max_backoff_ms: u64,
94}
95
96impl RetryPolicy {
97 pub fn conservative() -> Self {
98 Self {
99 max_attempts: 3,
100 initial_backoff_ms: 50,
101 max_backoff_ms: 500,
102 }
103 }
104
105 pub fn never() -> Self {
106 Self {
107 max_attempts: 1,
108 initial_backoff_ms: 0,
109 max_backoff_ms: 0,
110 }
111 }
112}
113
114impl Default for RetryPolicy {
115 fn default() -> Self {
116 Self::conservative()
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121pub struct AdapterCallContract {
122 pub retry_policy: RetryPolicy,
123 pub timeout_ms: u64,
124}
125
126impl AdapterCallContract {
127 pub fn low_latency() -> Self {
128 Self {
129 retry_policy: RetryPolicy::never(),
130 timeout_ms: 250,
131 }
132 }
133
134 pub fn default_query() -> Self {
135 Self {
136 retry_policy: RetryPolicy::conservative(),
137 timeout_ms: 1_500,
138 }
139 }
140
141 pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
142 self.retry_policy = retry_policy;
143 self
144 }
145
146 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
147 self.timeout_ms = timeout_ms.max(1);
148 self
149 }
150}
151
152impl Default for AdapterCallContract {
153 fn default() -> Self {
154 Self::default_query()
155 }
156}
157
158pub fn run_with_contract<T, F>(
159 source: &str,
160 operation: &str,
161 contract: AdapterCallContract,
162 context: &QueryContext,
163 mut operation_fn: F,
164) -> IntegrationResult<T>
165where
166 F: FnMut(u32) -> IntegrationResult<T>,
167{
168 let policy = context
169 .retry_policy_override()
170 .unwrap_or(contract.retry_policy);
171 let attempts = policy.max_attempts.max(1);
172 let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
173 let mut backoff_ms = policy.initial_backoff_ms;
174
175 for attempt in 1..=attempts {
176 let started_at = Instant::now();
177 let result = operation_fn(attempt);
178 let elapsed_ms = started_at.elapsed().as_millis() as u64;
179 if elapsed_ms > timeout_ms {
180 return Err(IntegrationError::new(
181 source,
182 IntegrationErrorKind::Timeout,
183 format!(
184 "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
185 elapsed_ms, timeout_ms
186 ),
187 )
188 .with_code("operation_timeout"));
189 }
190
191 match result {
192 Ok(value) => return Ok(value),
193 Err(err) if !err.retryable || attempt == attempts => return Err(err),
194 Err(err) => {
195 warn!(
196 target: "shelly.integration.query",
197 source,
198 operation,
199 attempt,
200 max_attempts = attempts,
201 tenant_id = ?context.tenant_id,
202 trace_id = ?context.trace_id,
203 correlation_id = context.correlation_id().unwrap_or("-"),
204 request_id = context.request_id().unwrap_or("-"),
205 timeout_ms,
206 backoff_ms,
207 error = %err,
208 "Shelly integration retrying transient failure without blocking sleep; use run_with_contract_async for non-blocking backoff"
209 );
210 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
211 }
212 }
213 }
214
215 Err(IntegrationError::new(
216 source,
217 IntegrationErrorKind::Transient,
218 format!("{operation} retry exhausted"),
219 )
220 .with_code("retry_exhausted"))
221}
222
223pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
224where
225 F: FnMut(u32) -> IntegrationResult<T>,
226{
227 let attempts = policy.max_attempts.max(1);
228 let mut backoff_ms = policy.initial_backoff_ms;
229
230 for attempt in 1..=attempts {
231 match operation(attempt) {
232 Ok(value) => return Ok(value),
233 Err(err) if !err.retryable || attempt == attempts => return Err(err),
234 Err(_) => {
235 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
236 }
237 }
238 }
239
240 Err(IntegrationError::new(
241 "retry",
242 IntegrationErrorKind::Transient,
243 "retry exhausted",
244 ))
245}
246
247pub async fn run_with_contract_async<T, F, Fut>(
248 source: &str,
249 operation: &str,
250 contract: AdapterCallContract,
251 context: &QueryContext,
252 mut operation_fn: F,
253) -> IntegrationResult<T>
254where
255 F: FnMut(u32) -> Fut,
256 Fut: Future<Output = IntegrationResult<T>>,
257{
258 let policy = context
259 .retry_policy_override()
260 .unwrap_or(contract.retry_policy);
261 let attempts = policy.max_attempts.max(1);
262 let timeout_ms = context.timeout_ms().unwrap_or(contract.timeout_ms.max(1));
263 let mut backoff_ms = policy.initial_backoff_ms;
264
265 for attempt in 1..=attempts {
266 let started_at = Instant::now();
267 let result = operation_fn(attempt).await;
268 let elapsed_ms = started_at.elapsed().as_millis() as u64;
269 if elapsed_ms > timeout_ms {
270 return Err(IntegrationError::new(
271 source,
272 IntegrationErrorKind::Timeout,
273 format!(
274 "{operation} exceeded timeout: elapsed={}ms timeout={}ms",
275 elapsed_ms, timeout_ms
276 ),
277 )
278 .with_code("operation_timeout"));
279 }
280
281 match result {
282 Ok(value) => return Ok(value),
283 Err(err) if !err.retryable || attempt == attempts => return Err(err),
284 Err(err) => {
285 warn!(
286 target: "shelly.integration.query",
287 source,
288 operation,
289 attempt,
290 max_attempts = attempts,
291 tenant_id = ?context.tenant_id,
292 trace_id = ?context.trace_id,
293 correlation_id = context.correlation_id().unwrap_or("-"),
294 request_id = context.request_id().unwrap_or("-"),
295 timeout_ms,
296 backoff_ms,
297 error = %err,
298 "Shelly integration retrying transient failure with non-blocking backoff"
299 );
300 if backoff_ms > 0 {
301 sleep(Duration::from_millis(backoff_ms)).await;
302 }
303 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
304 }
305 }
306 }
307
308 Err(IntegrationError::new(
309 source,
310 IntegrationErrorKind::Transient,
311 format!("{operation} retry exhausted"),
312 )
313 .with_code("retry_exhausted"))
314}
315
316pub async fn run_with_retry_async<T, F, Fut>(
317 policy: RetryPolicy,
318 mut operation: F,
319) -> IntegrationResult<T>
320where
321 F: FnMut(u32) -> Fut,
322 Fut: Future<Output = IntegrationResult<T>>,
323{
324 let attempts = policy.max_attempts.max(1);
325 let mut backoff_ms = policy.initial_backoff_ms;
326
327 for attempt in 1..=attempts {
328 match operation(attempt).await {
329 Ok(value) => return Ok(value),
330 Err(err) if !err.retryable || attempt == attempts => return Err(err),
331 Err(_) => {
332 if backoff_ms > 0 {
333 sleep(Duration::from_millis(backoff_ms)).await;
334 }
335 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
336 }
337 }
338 }
339
340 Err(IntegrationError::new(
341 "retry",
342 IntegrationErrorKind::Transient,
343 "retry exhausted",
344 ))
345}
346
347pub trait ConnectionLifecycleHook: Send + Sync {
348 fn on_connect(&self) -> IntegrationResult<()> {
349 Ok(())
350 }
351
352 fn on_disconnect(&self) -> IntegrationResult<()> {
353 Ok(())
354 }
355}
356
357pub trait ConnectionLifecycle {
358 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
359 fn connect(&self) -> IntegrationResult<()>;
360 fn disconnect(&self) -> IntegrationResult<()>;
361}
362
363#[derive(Default)]
364pub struct LifecycleHooks {
365 hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
366}
367
368impl LifecycleHooks {
369 pub fn new() -> Self {
370 Self::default()
371 }
372}
373
374impl ConnectionLifecycle for LifecycleHooks {
375 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
376 self.hooks.push(hook);
377 }
378
379 fn connect(&self) -> IntegrationResult<()> {
380 for hook in &self.hooks {
381 hook.on_connect()?;
382 }
383 Ok(())
384 }
385
386 fn disconnect(&self) -> IntegrationResult<()> {
387 for hook in &self.hooks {
388 hook.on_disconnect()?;
389 }
390 Ok(())
391 }
392}
393
394#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
395pub struct QueryContext {
396 pub tenant_id: Option<String>,
397 pub trace_id: Option<String>,
398 pub tags: BTreeMap<String, String>,
399}
400
401impl QueryContext {
402 pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
403 let tenant_id = tenant_id.into();
404 let tenant_id = tenant_id.trim();
405 if tenant_id.is_empty() {
406 self.tenant_id = None;
407 } else {
408 self.tenant_id = Some(tenant_id.to_string());
409 }
410 self
411 }
412
413 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
414 self.tags.insert(key.into(), value.into());
415 self
416 }
417
418 pub fn with_correlation_id(self, correlation_id: impl Into<String>) -> Self {
419 self.with_tag(CONTEXT_TAG_CORRELATION_ID, correlation_id.into())
420 }
421
422 pub fn with_request_id(self, request_id: impl Into<String>) -> Self {
423 self.with_tag(CONTEXT_TAG_REQUEST_ID, request_id.into())
424 }
425
426 pub fn with_timeout_ms(self, timeout_ms: u64) -> Self {
427 self.with_tag(CONTEXT_TAG_TIMEOUT_MS, timeout_ms.to_string())
428 }
429
430 pub fn with_retry_policy(self, policy: RetryPolicy) -> Self {
431 self.with_tag(
432 CONTEXT_TAG_RETRY_MAX_ATTEMPTS,
433 policy.max_attempts.to_string(),
434 )
435 .with_tag(
436 CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS,
437 policy.initial_backoff_ms.to_string(),
438 )
439 .with_tag(
440 CONTEXT_TAG_RETRY_MAX_BACKOFF_MS,
441 policy.max_backoff_ms.to_string(),
442 )
443 }
444
445 pub fn correlation_id(&self) -> Option<&str> {
446 self.tags
447 .get(CONTEXT_TAG_CORRELATION_ID)
448 .map(String::as_str)
449 .map(str::trim)
450 .filter(|value| !value.is_empty())
451 }
452
453 pub fn request_id(&self) -> Option<&str> {
454 self.tags
455 .get(CONTEXT_TAG_REQUEST_ID)
456 .map(String::as_str)
457 .map(str::trim)
458 .filter(|value| !value.is_empty())
459 }
460
461 pub fn timeout_ms(&self) -> Option<u64> {
462 self.tags
463 .get(CONTEXT_TAG_TIMEOUT_MS)
464 .and_then(|value| value.parse::<u64>().ok())
465 .filter(|value| *value > 0)
466 }
467
468 pub fn retry_policy_override(&self) -> Option<RetryPolicy> {
469 let max_attempts = self
470 .tags
471 .get(CONTEXT_TAG_RETRY_MAX_ATTEMPTS)
472 .and_then(|value| value.parse::<u32>().ok())?;
473 let initial_backoff_ms = self
474 .tags
475 .get(CONTEXT_TAG_RETRY_INITIAL_BACKOFF_MS)
476 .and_then(|value| value.parse::<u64>().ok())
477 .unwrap_or(0);
478 let max_backoff_ms = self
479 .tags
480 .get(CONTEXT_TAG_RETRY_MAX_BACKOFF_MS)
481 .and_then(|value| value.parse::<u64>().ok())
482 .unwrap_or(initial_backoff_ms);
483 Some(RetryPolicy {
484 max_attempts: max_attempts.max(1),
485 initial_backoff_ms,
486 max_backoff_ms: max_backoff_ms.max(initial_backoff_ms),
487 })
488 }
489}
490
491pub trait TypedQueryBoundary {
492 type Request: Send + Sync;
493 type Response: Send + Sync;
494
495 fn execute(
496 &self,
497 request: &Self::Request,
498 context: &QueryContext,
499 ) -> IntegrationResult<Self::Response>;
500}
501
502#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
503pub struct SqlCommand {
504 pub statement: String,
505 pub params: Vec<Value>,
506}
507
508impl SqlCommand {
509 pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
510 Self {
511 statement: statement.into(),
512 params,
513 }
514 }
515}
516
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub struct DataWindowRequest {
519 pub dataset: String,
520 pub offset: usize,
521 pub limit: usize,
522 #[serde(default, skip_serializing_if = "Option::is_none")]
523 pub query_token: Option<String>,
524 #[serde(default, skip_serializing_if = "Option::is_none")]
525 pub window_token: Option<String>,
526 #[serde(default)]
527 pub wire_format: WireFormatProfile,
528}
529
530impl DataWindowRequest {
531 pub fn new(dataset: impl Into<String>, offset: usize, limit: usize) -> Self {
532 Self {
533 dataset: dataset.into(),
534 offset,
535 limit: limit.max(1),
536 query_token: None,
537 window_token: None,
538 wire_format: WireFormatProfile::Json,
539 }
540 }
541
542 pub fn with_query_token(mut self, query_token: impl Into<String>) -> Self {
543 self.query_token = Some(query_token.into());
544 self
545 }
546
547 pub fn with_window_token(mut self, window_token: impl Into<String>) -> Self {
548 self.window_token = Some(window_token.into());
549 self
550 }
551
552 pub fn compact(mut self) -> Self {
553 self.wire_format = WireFormatProfile::Compact;
554 self
555 }
556}
557
558#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
559pub struct DataWindowResponse {
560 pub dataset: String,
561 pub offset: usize,
562 pub limit: usize,
563 pub total_rows: usize,
564 pub query_token: String,
565 pub window_token: String,
566 pub rows: Vec<Row>,
567 #[serde(default, skip_serializing_if = "Option::is_none")]
568 pub compact_rows: Option<CompactRowsPayload>,
569}
570
571pub trait SingleStoreAdapter:
572 TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
573{
574 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
575 let started_at = Instant::now();
576 let statement = query.statement.as_str();
577 let param_count = query.params.len();
578 let result = self.execute(&query, context);
579 match &result {
580 Ok(rows) => info!(
581 target: "shelly.integration.query",
582 source = "singlestore",
583 operation = "run_query",
584 tenant_id = ?context.tenant_id,
585 trace_id = ?context.trace_id,
586 tag_count = context.tags.len(),
587 statement,
588 param_count,
589 row_count = rows.len(),
590 duration_ms = started_at.elapsed().as_millis() as u64,
591 "Shelly integration query executed"
592 ),
593 Err(err) => warn!(
594 target: "shelly.integration.query",
595 source = "singlestore",
596 operation = "run_query",
597 tenant_id = ?context.tenant_id,
598 trace_id = ?context.trace_id,
599 tag_count = context.tags.len(),
600 statement,
601 param_count,
602 duration_ms = started_at.elapsed().as_millis() as u64,
603 error = %err,
604 "Shelly integration query failed"
605 ),
606 }
607 result
608 }
609
610 fn run_high_volume_window_query(
611 &self,
612 query: SqlCommand,
613 request: DataWindowRequest,
614 context: &QueryContext,
615 ) -> IntegrationResult<DataWindowResponse> {
616 let rows = self.run_query(query, context)?;
617 Ok(materialize_window_response(
618 "singlestore",
619 request,
620 rows,
621 None,
622 ))
623 }
624}
625
626pub trait ClickHouseAdapter: TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> {
627 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
628 let started_at = Instant::now();
629 let statement = query.statement.as_str();
630 let param_count = query.params.len();
631 let result = self.execute(&query, context);
632 match &result {
633 Ok(rows) => info!(
634 target: "shelly.integration.query",
635 source = "clickhouse",
636 operation = "run_query",
637 tenant_id = ?context.tenant_id,
638 trace_id = ?context.trace_id,
639 correlation_id = context.correlation_id().unwrap_or("-"),
640 request_id = context.request_id().unwrap_or("-"),
641 tag_count = context.tags.len(),
642 statement,
643 param_count,
644 row_count = rows.len(),
645 duration_ms = started_at.elapsed().as_millis() as u64,
646 "Shelly integration query executed"
647 ),
648 Err(err) => warn!(
649 target: "shelly.integration.query",
650 source = "clickhouse",
651 operation = "run_query",
652 tenant_id = ?context.tenant_id,
653 trace_id = ?context.trace_id,
654 correlation_id = context.correlation_id().unwrap_or("-"),
655 request_id = context.request_id().unwrap_or("-"),
656 tag_count = context.tags.len(),
657 statement,
658 param_count,
659 duration_ms = started_at.elapsed().as_millis() as u64,
660 error = %err,
661 "Shelly integration query failed"
662 ),
663 }
664 result
665 }
666
667 fn run_high_volume_window_query(
668 &self,
669 query: SqlCommand,
670 request: DataWindowRequest,
671 context: &QueryContext,
672 ) -> IntegrationResult<DataWindowResponse> {
673 let rows = self.run_query(query, context)?;
674 Ok(materialize_window_response(
675 "clickhouse",
676 request,
677 rows,
678 None,
679 ))
680 }
681}
682
683pub trait BigQueryAdapter: TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>> {
684 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
685 let started_at = Instant::now();
686 let statement = query.statement.as_str();
687 let param_count = query.params.len();
688 let result = self.execute(&query, context);
689 match &result {
690 Ok(rows) => info!(
691 target: "shelly.integration.query",
692 source = "bigquery",
693 operation = "run_query",
694 tenant_id = ?context.tenant_id,
695 trace_id = ?context.trace_id,
696 correlation_id = context.correlation_id().unwrap_or("-"),
697 request_id = context.request_id().unwrap_or("-"),
698 tag_count = context.tags.len(),
699 statement,
700 param_count,
701 row_count = rows.len(),
702 duration_ms = started_at.elapsed().as_millis() as u64,
703 "Shelly integration query executed"
704 ),
705 Err(err) => warn!(
706 target: "shelly.integration.query",
707 source = "bigquery",
708 operation = "run_query",
709 tenant_id = ?context.tenant_id,
710 trace_id = ?context.trace_id,
711 correlation_id = context.correlation_id().unwrap_or("-"),
712 request_id = context.request_id().unwrap_or("-"),
713 tag_count = context.tags.len(),
714 statement,
715 param_count,
716 duration_ms = started_at.elapsed().as_millis() as u64,
717 error = %err,
718 "Shelly integration query failed"
719 ),
720 }
721 result
722 }
723
724 fn run_high_volume_window_query(
725 &self,
726 query: SqlCommand,
727 request: DataWindowRequest,
728 context: &QueryContext,
729 ) -> IntegrationResult<DataWindowResponse> {
730 let rows = self.run_query(query, context)?;
731 Ok(materialize_window_response("bigquery", request, rows, None))
732 }
733}
734
735#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
736pub struct SearchRequest {
737 pub index: String,
738 pub text: String,
739 pub filters: Vec<crate::query::Filter>,
740 pub page: usize,
741 pub per_page: usize,
742}
743
744impl SearchRequest {
745 pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
746 Self {
747 index: index.into(),
748 text: text.into(),
749 filters: Vec::new(),
750 page: 1,
751 per_page: 25,
752 }
753 }
754
755 pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
756 self.filters.push(filter);
757 self
758 }
759}
760
761#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
762pub struct SearchResponse {
763 pub total_hits: usize,
764 pub rows: Vec<StoredRow>,
765}
766
767pub trait OpenSearchAdapter:
768 TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
769{
770 fn search(
771 &self,
772 request: SearchRequest,
773 context: &QueryContext,
774 ) -> IntegrationResult<SearchResponse> {
775 let started_at = Instant::now();
776 let index = request.index.as_str();
777 let text = request.text.as_str();
778 let filter_count = request.filters.len();
779 let page = request.page;
780 let per_page = request.per_page;
781 let result = self.execute(&request, context);
782 match &result {
783 Ok(response) => info!(
784 target: "shelly.integration.query",
785 source = "opensearch",
786 operation = "search",
787 tenant_id = ?context.tenant_id,
788 trace_id = ?context.trace_id,
789 tag_count = context.tags.len(),
790 index,
791 text,
792 filter_count,
793 page,
794 per_page,
795 row_count = response.rows.len(),
796 total_hits = response.total_hits,
797 duration_ms = started_at.elapsed().as_millis() as u64,
798 "Shelly integration query executed"
799 ),
800 Err(err) => warn!(
801 target: "shelly.integration.query",
802 source = "opensearch",
803 operation = "search",
804 tenant_id = ?context.tenant_id,
805 trace_id = ?context.trace_id,
806 tag_count = context.tags.len(),
807 index,
808 text,
809 filter_count,
810 page,
811 per_page,
812 duration_ms = started_at.elapsed().as_millis() as u64,
813 error = %err,
814 "Shelly integration query failed"
815 ),
816 }
817 result
818 }
819
820 fn search_window(
821 &self,
822 mut request: SearchRequest,
823 window: DataWindowRequest,
824 context: &QueryContext,
825 ) -> IntegrationResult<DataWindowResponse> {
826 let limit = window.limit.max(1);
827 request.page = (window.offset / limit).saturating_add(1);
828 request.per_page = limit;
829 let response = self.search(request, context)?;
830 let rows = response
831 .rows
832 .into_iter()
833 .map(|row| row.data)
834 .collect::<Vec<_>>();
835 let query_token = window
836 .query_token
837 .unwrap_or_else(|| format!("opensearch:{}:{}", window.dataset, response.total_hits));
838 let window_token = window.window_token.unwrap_or_else(|| {
839 format!(
840 "{query_token}:offset={}:limit={limit}:rows={}",
841 window.offset,
842 rows.len()
843 )
844 });
845 let compact_rows =
846 (window.wire_format == WireFormatProfile::Compact).then(|| encode_compact_rows(&rows));
847 Ok(DataWindowResponse {
848 dataset: window.dataset,
849 offset: window.offset,
850 limit,
851 total_rows: response.total_hits,
852 query_token,
853 window_token,
854 rows,
855 compact_rows,
856 })
857 }
858}
859
860pub trait AnalyticsSink: Send + Sync {
861 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
862}
863
864#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
865pub struct AnalyticsEvent {
866 pub namespace: String,
867 pub name: String,
868 pub payload: Value,
869}
870
871impl AnalyticsEvent {
872 pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
873 Self {
874 namespace: namespace.into(),
875 name: name.into(),
876 payload,
877 }
878 }
879}
880
881#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
882pub struct JobRequest {
883 pub workflow: String,
884 pub payload: Value,
885 pub idempotency_key: String,
886 pub metadata: BTreeMap<String, String>,
887}
888
889impl JobRequest {
890 pub fn new(
891 workflow: impl Into<String>,
892 payload: Value,
893 idempotency_key: impl Into<String>,
894 ) -> Self {
895 Self {
896 workflow: workflow.into(),
897 payload,
898 idempotency_key: idempotency_key.into(),
899 metadata: BTreeMap::new(),
900 }
901 }
902}
903
904#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
905#[serde(rename_all = "snake_case")]
906pub enum JobState {
907 Queued,
908 Running,
909 Succeeded,
910 Failed,
911 Canceled,
912}
913
914#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
915pub struct JobStatus {
916 pub id: String,
917 pub state: JobState,
918 pub attempts: u32,
919 pub result: Option<Value>,
920 pub error: Option<IntegrationError>,
921}
922
923#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
924pub struct JobHandle {
925 pub id: String,
926 pub workflow: String,
927 pub idempotency_key: String,
928}
929
930pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
931
932pub trait JobOrchestrator: Send + Sync {
933 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
934 fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
935 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
936 fn register_completion_callback(&self, callback: JobCompletionCallback);
937}
938
939pub trait TriggerDevAdapter: Send + Sync {
940 fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
941 fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus>;
942 fn poll_workflow(
943 &self,
944 id: &str,
945 attempts: u32,
946 backoff_ms: u64,
947 ) -> IntegrationResult<JobStatus>;
948}
949
950impl<T> TriggerDevAdapter for T
951where
952 T: JobOrchestrator + Send + Sync,
953{
954 fn trigger_workflow(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
955 self.enqueue(request)
956 }
957
958 fn workflow_status(&self, id: &str) -> IntegrationResult<JobStatus> {
959 self.status(id)
960 }
961
962 fn poll_workflow(
963 &self,
964 id: &str,
965 attempts: u32,
966 backoff_ms: u64,
967 ) -> IntegrationResult<JobStatus> {
968 self.poll(id, attempts, backoff_ms)
969 }
970}
971
972#[derive(Clone)]
973pub struct AxiomTelemetryBridge<S: AnalyticsSink> {
974 sink: Arc<S>,
975 namespace: String,
976}
977
978impl<S: AnalyticsSink> AxiomTelemetryBridge<S> {
979 pub fn new(sink: Arc<S>, namespace: impl Into<String>) -> Self {
980 Self {
981 sink,
982 namespace: namespace.into(),
983 }
984 }
985
986 pub fn emit(
987 &self,
988 name: impl Into<String>,
989 payload: Value,
990 context: &QueryContext,
991 ) -> IntegrationResult<()> {
992 let mut envelope = serde_json::Map::new();
993 envelope.insert("payload".to_string(), payload);
994 envelope.insert(
995 "tenant_id".to_string(),
996 context
997 .tenant_id
998 .as_ref()
999 .map(|value| Value::String(value.clone()))
1000 .unwrap_or(Value::Null),
1001 );
1002 envelope.insert(
1003 "trace_id".to_string(),
1004 context
1005 .trace_id
1006 .as_ref()
1007 .map(|value| Value::String(value.clone()))
1008 .unwrap_or(Value::Null),
1009 );
1010 envelope.insert(
1011 "correlation_id".to_string(),
1012 context
1013 .correlation_id()
1014 .map(|value| Value::String(value.to_string()))
1015 .unwrap_or(Value::Null),
1016 );
1017 envelope.insert(
1018 "request_id".to_string(),
1019 context
1020 .request_id()
1021 .map(|value| Value::String(value.to_string()))
1022 .unwrap_or(Value::Null),
1023 );
1024 envelope.insert("tags".to_string(), serde_json::json!(context.tags));
1025
1026 self.sink.send_event(AnalyticsEvent::new(
1027 self.namespace.clone(),
1028 name.into(),
1029 Value::Object(envelope),
1030 ))
1031 }
1032}
1033
1034#[derive(Debug, Clone, Default)]
1035pub struct InMemorySingleStoreAdapter {
1036 rows: Vec<Row>,
1037}
1038
1039impl InMemorySingleStoreAdapter {
1040 pub fn new(rows: Vec<Row>) -> Self {
1041 Self { rows }
1042 }
1043}
1044
1045impl TypedQueryBoundary for InMemorySingleStoreAdapter {
1046 type Request = SqlCommand;
1047 type Response = Vec<Row>;
1048
1049 fn execute(
1050 &self,
1051 request: &Self::Request,
1052 _context: &QueryContext,
1053 ) -> IntegrationResult<Self::Response> {
1054 if request.statement.trim().is_empty() {
1055 return Err(IntegrationError::new(
1056 "singlestore",
1057 IntegrationErrorKind::InvalidInput,
1058 "empty SQL statement",
1059 )
1060 .with_code("empty_statement"));
1061 }
1062 Ok(self.rows.clone())
1063 }
1064}
1065
1066impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
1067
1068#[derive(Debug, Clone, Default)]
1069pub struct InMemoryClickHouseAdapter {
1070 rows: Vec<Row>,
1071}
1072
1073impl InMemoryClickHouseAdapter {
1074 pub fn new(rows: Vec<Row>) -> Self {
1075 Self { rows }
1076 }
1077}
1078
1079impl TypedQueryBoundary for InMemoryClickHouseAdapter {
1080 type Request = SqlCommand;
1081 type Response = Vec<Row>;
1082
1083 fn execute(
1084 &self,
1085 request: &Self::Request,
1086 _context: &QueryContext,
1087 ) -> IntegrationResult<Self::Response> {
1088 if request.statement.trim().is_empty() {
1089 return Err(IntegrationError::new(
1090 "clickhouse",
1091 IntegrationErrorKind::InvalidInput,
1092 "empty SQL statement",
1093 )
1094 .with_code("empty_statement"));
1095 }
1096 Ok(self.rows.clone())
1097 }
1098}
1099
1100impl ClickHouseAdapter for InMemoryClickHouseAdapter {}
1101
1102#[derive(Debug, Clone, Default)]
1103pub struct InMemoryBigQueryAdapter {
1104 rows: Vec<Row>,
1105}
1106
1107impl InMemoryBigQueryAdapter {
1108 pub fn new(rows: Vec<Row>) -> Self {
1109 Self { rows }
1110 }
1111}
1112
1113impl TypedQueryBoundary for InMemoryBigQueryAdapter {
1114 type Request = SqlCommand;
1115 type Response = Vec<Row>;
1116
1117 fn execute(
1118 &self,
1119 request: &Self::Request,
1120 _context: &QueryContext,
1121 ) -> IntegrationResult<Self::Response> {
1122 if request.statement.trim().is_empty() {
1123 return Err(IntegrationError::new(
1124 "bigquery",
1125 IntegrationErrorKind::InvalidInput,
1126 "empty SQL statement",
1127 )
1128 .with_code("empty_statement"));
1129 }
1130 Ok(self.rows.clone())
1131 }
1132}
1133
1134impl BigQueryAdapter for InMemoryBigQueryAdapter {}
1135
1136#[derive(Debug, Clone, Default)]
1137pub struct InMemoryOpenSearchAdapter {
1138 rows: Vec<StoredRow>,
1139}
1140
1141impl InMemoryOpenSearchAdapter {
1142 pub fn new(rows: Vec<StoredRow>) -> Self {
1143 Self { rows }
1144 }
1145}
1146
1147impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
1148 type Request = SearchRequest;
1149 type Response = SearchResponse;
1150
1151 fn execute(
1152 &self,
1153 request: &Self::Request,
1154 _context: &QueryContext,
1155 ) -> IntegrationResult<Self::Response> {
1156 if request.index.trim().is_empty() {
1157 return Err(IntegrationError::new(
1158 "opensearch",
1159 IntegrationErrorKind::InvalidInput,
1160 "search index must not be empty",
1161 )
1162 .with_code("empty_index"));
1163 }
1164 let needle = request.text.trim().to_lowercase();
1165 let mut filtered = self
1166 .rows
1167 .iter()
1168 .filter(|row| {
1169 if needle.is_empty() {
1170 return true;
1171 }
1172 row.data.values().any(|value| {
1173 value
1174 .as_str()
1175 .map(|text| text.to_lowercase().contains(&needle))
1176 .unwrap_or(false)
1177 })
1178 })
1179 .cloned()
1180 .collect::<Vec<_>>();
1181 let total_hits = filtered.len();
1182 let page = request.page.max(1);
1183 let per_page = request.per_page.max(1);
1184 let offset = (page - 1) * per_page;
1185 filtered = filtered.into_iter().skip(offset).take(per_page).collect();
1186 Ok(SearchResponse {
1187 total_hits,
1188 rows: filtered,
1189 })
1190 }
1191}
1192
1193impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
1194
1195#[derive(Debug, Default, Clone)]
1196pub struct InMemoryAxiomSink {
1197 events: Arc<Mutex<Vec<AnalyticsEvent>>>,
1198}
1199
1200impl InMemoryAxiomSink {
1201 pub fn events(&self) -> Vec<AnalyticsEvent> {
1202 self.events
1203 .lock()
1204 .map(|events| events.clone())
1205 .unwrap_or_default()
1206 }
1207}
1208
1209impl AnalyticsSink for InMemoryAxiomSink {
1210 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
1211 let started_at = Instant::now();
1212 let namespace = event.namespace.clone();
1213 let name = event.name.clone();
1214 self.events
1215 .lock()
1216 .map_err(|_| {
1217 IntegrationError::new(
1218 "axiom",
1219 IntegrationErrorKind::Unavailable,
1220 "analytics sink lock poisoned",
1221 )
1222 })?
1223 .push(event);
1224 info!(
1225 target: "shelly.integration.query",
1226 source = "axiom",
1227 operation = "send_event",
1228 namespace,
1229 event_name = name,
1230 duration_ms = started_at.elapsed().as_millis() as u64,
1231 "Shelly integration call executed"
1232 );
1233 Ok(())
1234 }
1235}
1236
1237#[derive(Default)]
1238pub struct InMemoryJobOrchestrator {
1239 statuses: Mutex<HashMap<String, JobStatus>>,
1240 callbacks: Mutex<Vec<JobCompletionCallback>>,
1241 next_id: Mutex<u64>,
1242}
1243
1244impl InMemoryJobOrchestrator {
1245 pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
1246 let status = self.with_status_mut(id, |status| {
1247 status.state = JobState::Succeeded;
1248 status.result = Some(result);
1249 status.error = None;
1250 })?;
1251 self.notify(&status);
1252 Ok(())
1253 }
1254
1255 pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
1256 let status = self.with_status_mut(id, |status| {
1257 status.state = JobState::Failed;
1258 status.result = None;
1259 status.error = Some(error);
1260 })?;
1261 self.notify(&status);
1262 Ok(())
1263 }
1264
1265 fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
1266 where
1267 F: FnOnce(&mut JobStatus),
1268 {
1269 let mut statuses = self.statuses.lock().map_err(|_| {
1270 IntegrationError::new(
1271 "trigger",
1272 IntegrationErrorKind::Unavailable,
1273 "job status lock poisoned",
1274 )
1275 })?;
1276 let Some(status) = statuses.get_mut(id) else {
1277 return Err(IntegrationError::new(
1278 "trigger",
1279 IntegrationErrorKind::InvalidInput,
1280 "job not found",
1281 )
1282 .with_code("job_not_found"));
1283 };
1284 status.attempts = status.attempts.saturating_add(1);
1285 update(status);
1286 Ok(status.clone())
1287 }
1288
1289 fn notify(&self, status: &JobStatus) {
1290 if let Ok(callbacks) = self.callbacks.lock() {
1291 for callback in callbacks.iter() {
1292 callback(status);
1293 }
1294 }
1295 }
1296
1297 fn is_terminal(state: JobState) -> bool {
1298 matches!(
1299 state,
1300 JobState::Succeeded | JobState::Failed | JobState::Canceled
1301 )
1302 }
1303}
1304
1305impl JobOrchestrator for InMemoryJobOrchestrator {
1306 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
1307 let started_at = Instant::now();
1308 let workflow = request.workflow.clone();
1309 let idempotency_key = request.idempotency_key.clone();
1310 let mut next_id = self.next_id.lock().map_err(|_| {
1311 IntegrationError::new(
1312 "trigger",
1313 IntegrationErrorKind::Unavailable,
1314 "job id lock poisoned",
1315 )
1316 })?;
1317 *next_id = next_id.saturating_add(1);
1318 let id = format!("job-{next_id}");
1319 let status = JobStatus {
1320 id: id.clone(),
1321 state: JobState::Queued,
1322 attempts: 0,
1323 result: None,
1324 error: None,
1325 };
1326 self.statuses
1327 .lock()
1328 .map_err(|_| {
1329 IntegrationError::new(
1330 "trigger",
1331 IntegrationErrorKind::Unavailable,
1332 "job status lock poisoned",
1333 )
1334 })?
1335 .insert(id.clone(), status);
1336 let handle = JobHandle {
1337 id,
1338 workflow: request.workflow,
1339 idempotency_key: request.idempotency_key,
1340 };
1341 info!(
1342 target: "shelly.integration.query",
1343 source = "trigger",
1344 operation = "enqueue",
1345 workflow,
1346 idempotency_key,
1347 job_id = handle.id.as_str(),
1348 duration_ms = started_at.elapsed().as_millis() as u64,
1349 "Shelly integration call executed"
1350 );
1351 Ok(handle)
1352 }
1353
1354 fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
1355 let started_at = Instant::now();
1356 let result = self
1357 .statuses
1358 .lock()
1359 .map_err(|_| {
1360 IntegrationError::new(
1361 "trigger",
1362 IntegrationErrorKind::Unavailable,
1363 "job status lock poisoned",
1364 )
1365 })?
1366 .get(id)
1367 .cloned()
1368 .ok_or_else(|| {
1369 IntegrationError::new(
1370 "trigger",
1371 IntegrationErrorKind::InvalidInput,
1372 "job not found",
1373 )
1374 .with_code("job_not_found")
1375 });
1376 match &result {
1377 Ok(status) => info!(
1378 target: "shelly.integration.query",
1379 source = "trigger",
1380 operation = "status",
1381 job_id = id,
1382 job_state = ?status.state,
1383 attempts = status.attempts,
1384 duration_ms = started_at.elapsed().as_millis() as u64,
1385 "Shelly integration call executed"
1386 ),
1387 Err(err) => warn!(
1388 target: "shelly.integration.query",
1389 source = "trigger",
1390 operation = "status",
1391 job_id = id,
1392 duration_ms = started_at.elapsed().as_millis() as u64,
1393 error = %err,
1394 "Shelly integration call failed"
1395 ),
1396 }
1397 result
1398 }
1399
1400 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
1401 let started_at = Instant::now();
1402 let attempts = attempts.max(1);
1403 let mut polled = 0u32;
1404 for current in 1..=attempts {
1405 let status = self.status(id)?;
1406 polled = current;
1407 if Self::is_terminal(status.state) || current == attempts {
1408 info!(
1409 target: "shelly.integration.query",
1410 source = "trigger",
1411 operation = "poll",
1412 job_id = id,
1413 attempts = polled,
1414 backoff_ms,
1415 terminal = Self::is_terminal(status.state),
1416 job_state = ?status.state,
1417 duration_ms = started_at.elapsed().as_millis() as u64,
1418 "Shelly integration call executed"
1419 );
1420 return Ok(status);
1421 }
1422 if backoff_ms > 0 {
1423 thread::sleep(Duration::from_millis(backoff_ms));
1424 }
1425 }
1426
1427 let result = self.status(id);
1428 match &result {
1429 Ok(status) => info!(
1430 target: "shelly.integration.query",
1431 source = "trigger",
1432 operation = "poll",
1433 job_id = id,
1434 attempts = polled,
1435 backoff_ms,
1436 terminal = Self::is_terminal(status.state),
1437 job_state = ?status.state,
1438 duration_ms = started_at.elapsed().as_millis() as u64,
1439 "Shelly integration call executed"
1440 ),
1441 Err(err) => warn!(
1442 target: "shelly.integration.query",
1443 source = "trigger",
1444 operation = "poll",
1445 job_id = id,
1446 attempts = polled,
1447 backoff_ms,
1448 duration_ms = started_at.elapsed().as_millis() as u64,
1449 error = %err,
1450 "Shelly integration call failed"
1451 ),
1452 }
1453 result
1454 }
1455
1456 fn register_completion_callback(&self, callback: JobCompletionCallback) {
1457 if let Ok(mut callbacks) = self.callbacks.lock() {
1458 callbacks.push(callback);
1459 info!(
1460 target: "shelly.integration.query",
1461 source = "trigger",
1462 operation = "register_completion_callback",
1463 callback_count = callbacks.len(),
1464 "Shelly integration callback registered"
1465 );
1466 } else {
1467 warn!(
1468 target: "shelly.integration.query",
1469 source = "trigger",
1470 operation = "register_completion_callback",
1471 "Shelly integration callback registration failed"
1472 );
1473 }
1474 }
1475}
1476
1477#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1478pub struct AdapterConformanceCheck {
1479 pub name: String,
1480 pub passed: bool,
1481 pub detail: String,
1482}
1483
1484#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
1485pub struct AdapterConformanceReport {
1486 pub checks: Vec<AdapterConformanceCheck>,
1487}
1488
1489impl AdapterConformanceReport {
1490 pub fn passed(&self) -> bool {
1491 self.checks.iter().all(|check| check.passed)
1492 }
1493
1494 pub fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1495 self.checks.push(AdapterConformanceCheck {
1496 name: name.into(),
1497 passed: true,
1498 detail: detail.into(),
1499 });
1500 }
1501
1502 pub fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
1503 self.checks.push(AdapterConformanceCheck {
1504 name: name.into(),
1505 passed: false,
1506 detail: detail.into(),
1507 });
1508 }
1509}
1510
1511pub fn run_adapter_conformance_suite(
1512 singlestore: &dyn SingleStoreAdapter,
1513 clickhouse: &dyn ClickHouseAdapter,
1514 bigquery: &dyn BigQueryAdapter,
1515 opensearch: &dyn OpenSearchAdapter,
1516 trigger: &dyn TriggerDevAdapter,
1517 analytics: &dyn AnalyticsSink,
1518 context: &QueryContext,
1519) -> AdapterConformanceReport {
1520 let mut report = AdapterConformanceReport::default();
1521
1522 match singlestore.run_query(
1523 SqlCommand::new(
1524 "SELECT tenant, active_accounts FROM tenant_rollup",
1525 Vec::new(),
1526 ),
1527 context,
1528 ) {
1529 Ok(rows) => report.add_pass("singlestore.query", format!("rows={}", rows.len())),
1530 Err(err) => report.add_fail("singlestore.query", err.to_string()),
1531 }
1532
1533 match clickhouse.run_query(
1534 SqlCommand::new("SELECT account, events FROM account_events", Vec::new()),
1535 context,
1536 ) {
1537 Ok(rows) => report.add_pass("clickhouse.query", format!("rows={}", rows.len())),
1538 Err(err) => report.add_fail("clickhouse.query", err.to_string()),
1539 }
1540
1541 match bigquery.run_query(
1542 SqlCommand::new("SELECT account, arr FROM analytics_rollup", Vec::new()),
1543 context,
1544 ) {
1545 Ok(rows) => report.add_pass("bigquery.query", format!("rows={}", rows.len())),
1546 Err(err) => report.add_fail("bigquery.query", err.to_string()),
1547 }
1548
1549 match opensearch.search(SearchRequest::new("accounts", ""), context) {
1550 Ok(response) => report.add_pass(
1551 "opensearch.search",
1552 format!(
1553 "rows={} total_hits={}",
1554 response.rows.len(),
1555 response.total_hits
1556 ),
1557 ),
1558 Err(err) => report.add_fail("opensearch.search", err.to_string()),
1559 }
1560
1561 match trigger.trigger_workflow(JobRequest::new(
1562 "adapter_conformance_probe",
1563 serde_json::json!({"probe": true}),
1564 "adapter-conformance-probe",
1565 )) {
1566 Ok(handle) => {
1567 report.add_pass("trigger.enqueue", format!("id={}", handle.id));
1568 match trigger.workflow_status(&handle.id) {
1569 Ok(status) => report.add_pass(
1570 "trigger.status",
1571 format!("state={:?} attempts={}", status.state, status.attempts),
1572 ),
1573 Err(err) => report.add_fail("trigger.status", err.to_string()),
1574 }
1575 }
1576 Err(err) => report.add_fail("trigger.enqueue", err.to_string()),
1577 }
1578
1579 let analytics_event = AnalyticsEvent::new(
1580 "adapter_conformance",
1581 "probe",
1582 serde_json::json!({
1583 "trace_id": context.trace_id,
1584 "correlation_id": context.correlation_id(),
1585 "request_id": context.request_id(),
1586 }),
1587 );
1588 match analytics.send_event(analytics_event) {
1589 Ok(()) => report.add_pass("analytics.emit", "event accepted"),
1590 Err(err) => report.add_fail("analytics.emit", err.to_string()),
1591 }
1592
1593 let context_correlation_check =
1594 context.correlation_id().is_some() && context.request_id().is_some();
1595 if context_correlation_check {
1596 report.add_pass(
1597 "context.correlation",
1598 "correlation_id and request_id present",
1599 );
1600 } else {
1601 report.add_fail(
1602 "context.correlation",
1603 "missing correlation_id/request_id; set QueryContext::with_correlation_id + with_request_id",
1604 );
1605 }
1606
1607 let retry_override_context = context.clone().with_retry_policy(RetryPolicy {
1608 max_attempts: 2,
1609 initial_backoff_ms: 0,
1610 max_backoff_ms: 0,
1611 });
1612 let mut retry_attempts = 0u32;
1613 match run_with_contract(
1614 "conformance",
1615 "retry_override_probe",
1616 AdapterCallContract::default_query(),
1617 &retry_override_context,
1618 |attempt| {
1619 retry_attempts = attempt;
1620 if attempt == 1 {
1621 Err(IntegrationError::new(
1622 "conformance",
1623 IntegrationErrorKind::Transient,
1624 "transient probe failure",
1625 ))
1626 } else {
1627 Ok("ok")
1628 }
1629 },
1630 ) {
1631 Ok(_) if retry_attempts == 2 => report.add_pass(
1632 "contract.retry_override",
1633 "retry override honored with bounded attempts",
1634 ),
1635 Ok(_) => report.add_fail(
1636 "contract.retry_override",
1637 format!("unexpected attempts={retry_attempts} expected=2"),
1638 ),
1639 Err(err) => report.add_fail("contract.retry_override", err.to_string()),
1640 }
1641
1642 let timeout_context = context.clone().with_timeout_ms(1);
1643 match run_with_contract(
1644 "conformance",
1645 "timeout_probe",
1646 AdapterCallContract::default_query(),
1647 &timeout_context,
1648 |_| {
1649 thread::sleep(Duration::from_millis(5));
1650 Ok::<_, IntegrationError>("done")
1651 },
1652 ) {
1653 Err(err) if err.kind == IntegrationErrorKind::Timeout => {
1654 report.add_pass("contract.timeout", "timeout classification validated")
1655 }
1656 Err(err) => report.add_fail("contract.timeout", err.to_string()),
1657 Ok(_) => report.add_fail(
1658 "contract.timeout",
1659 "expected timeout but operation returned success",
1660 ),
1661 }
1662
1663 report
1664}
1665
1666pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
1667 DataError::Integration(format!("[{}] {}", source.into(), err))
1668}
1669
1670pub fn map_integration_result<T>(
1671 source: impl Into<String>,
1672 result: IntegrationResult<T>,
1673) -> DataResult<T> {
1674 result.map_err(|err| map_integration_error(source, err))
1675}
1676
1677pub fn query_from_search(request: &SearchRequest) -> Query {
1678 let mut query = Query::new().paginate(request.page, request.per_page);
1679 for filter in &request.filters {
1680 query = query.where_filter(filter.clone());
1681 }
1682 query
1683}
1684
1685fn materialize_window_response(
1686 source: &str,
1687 request: DataWindowRequest,
1688 rows: Vec<Row>,
1689 total_rows_hint: Option<usize>,
1690) -> DataWindowResponse {
1691 let total_rows = total_rows_hint.unwrap_or(rows.len());
1692 let offset = request.offset.min(total_rows);
1693 let limit = request.limit.max(1);
1694 let window_rows = rows
1695 .into_iter()
1696 .skip(offset)
1697 .take(limit)
1698 .collect::<Vec<_>>();
1699 let query_token = request
1700 .query_token
1701 .unwrap_or_else(|| format!("{source}:{}:{}", request.dataset, total_rows));
1702 let window_token = request.window_token.unwrap_or_else(|| {
1703 format!(
1704 "{query_token}:offset={offset}:limit={limit}:rows={}",
1705 window_rows.len()
1706 )
1707 });
1708 let compact_rows = (request.wire_format == WireFormatProfile::Compact)
1709 .then(|| encode_compact_rows(&window_rows));
1710
1711 DataWindowResponse {
1712 dataset: request.dataset,
1713 offset,
1714 limit,
1715 total_rows,
1716 query_token,
1717 window_token,
1718 rows: window_rows,
1719 compact_rows,
1720 }
1721}
1722
1723fn encode_compact_rows(rows: &[Row]) -> CompactRowsPayload {
1724 let mut columns: Vec<String> = Vec::new();
1725 for row in rows {
1726 for key in row.keys() {
1727 if !columns.iter().any(|column| column == key) {
1728 columns.push(key.clone());
1729 }
1730 }
1731 }
1732
1733 let encoded_rows = rows
1734 .iter()
1735 .map(|row| {
1736 columns
1737 .iter()
1738 .map(|column| row.get(column).cloned().unwrap_or(Value::Null))
1739 .collect::<Vec<_>>()
1740 })
1741 .collect::<Vec<_>>();
1742
1743 CompactRowsPayload {
1744 columns,
1745 rows: encoded_rows,
1746 }
1747}
1748
1749#[cfg(test)]
1750mod tests {
1751 use super::{
1752 map_integration_result, run_adapter_conformance_suite, run_with_contract,
1753 run_with_contract_async, run_with_retry, run_with_retry_async, AdapterCallContract,
1754 AnalyticsEvent, AnalyticsSink, AxiomTelemetryBridge, BigQueryAdapter, ClickHouseAdapter,
1755 ConnectionLifecycle, ConnectionLifecycleHook, DataWindowRequest, InMemoryAxiomSink,
1756 InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryJobOrchestrator,
1757 InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
1758 IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
1759 LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
1760 SearchResponse, SingleStoreAdapter, SqlCommand, TriggerDevAdapter, TypedQueryBoundary,
1761 CONTEXT_TAG_CORRELATION_ID, CONTEXT_TAG_REQUEST_ID, CONTEXT_TAG_TIMEOUT_MS,
1762 };
1763 use serde_json::{json, Value};
1764 use std::sync::{Arc, Mutex};
1765
1766 struct CountingHook {
1767 connects: Arc<Mutex<u32>>,
1768 disconnects: Arc<Mutex<u32>>,
1769 }
1770
1771 impl ConnectionLifecycleHook for CountingHook {
1772 fn on_connect(&self) -> super::IntegrationResult<()> {
1773 let mut guard = self.connects.lock().unwrap();
1774 *guard += 1;
1775 Ok(())
1776 }
1777
1778 fn on_disconnect(&self) -> super::IntegrationResult<()> {
1779 let mut guard = self.disconnects.lock().unwrap();
1780 *guard += 1;
1781 Ok(())
1782 }
1783 }
1784
1785 #[test]
1786 fn lifecycle_hooks_are_called() {
1787 let connects = Arc::new(Mutex::new(0));
1788 let disconnects = Arc::new(Mutex::new(0));
1789 let mut lifecycle = LifecycleHooks::new();
1790 lifecycle.register_hook(Arc::new(CountingHook {
1791 connects: connects.clone(),
1792 disconnects: disconnects.clone(),
1793 }));
1794
1795 lifecycle.connect().unwrap();
1796 lifecycle.disconnect().unwrap();
1797 assert_eq!(*connects.lock().unwrap(), 1);
1798 assert_eq!(*disconnects.lock().unwrap(), 1);
1799 }
1800
1801 #[test]
1802 fn retry_policy_retries_transient_errors() {
1803 let mut calls = 0u32;
1804 let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
1805 calls = attempt;
1806 if attempt < 3 {
1807 Err(IntegrationError::new(
1808 "opensearch",
1809 IntegrationErrorKind::Transient,
1810 "temporary failure",
1811 ))
1812 } else {
1813 Ok("ok")
1814 }
1815 })
1816 .unwrap();
1817
1818 assert_eq!(result, "ok");
1819 assert_eq!(calls, 3);
1820 }
1821
1822 #[test]
1823 fn retry_policy_stops_on_permanent_errors() {
1824 let mut calls = 0u32;
1825 let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
1826 calls = attempt;
1827 Err::<(), IntegrationError>(IntegrationError::new(
1828 "singlestore",
1829 IntegrationErrorKind::Permanent,
1830 "invalid sql",
1831 ))
1832 })
1833 .unwrap_err();
1834
1835 assert_eq!(calls, 1);
1836 assert_eq!(err.kind, IntegrationErrorKind::Permanent);
1837 }
1838
1839 #[test]
1840 fn integration_result_maps_into_data_error() {
1841 let mapped = map_integration_result::<()>(
1842 "trigger",
1843 Err(IntegrationError::new(
1844 "trigger",
1845 IntegrationErrorKind::Unavailable,
1846 "service unavailable",
1847 )),
1848 )
1849 .unwrap_err();
1850
1851 assert!(mapped.to_string().contains("service unavailable"));
1852 }
1853
1854 #[derive(Default)]
1855 struct InMemoryJobs {
1856 statuses: Arc<Mutex<Vec<JobStatus>>>,
1857 callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
1858 }
1859
1860 impl JobOrchestrator for InMemoryJobs {
1861 fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
1862 let id = format!("job-{}", request.idempotency_key);
1863 self.statuses.lock().unwrap().push(JobStatus {
1864 id: id.clone(),
1865 state: JobState::Queued,
1866 attempts: 0,
1867 result: None,
1868 error: None,
1869 });
1870 Ok(JobHandle {
1871 id,
1872 workflow: request.workflow,
1873 idempotency_key: request.idempotency_key,
1874 })
1875 }
1876
1877 fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
1878 self.statuses
1879 .lock()
1880 .unwrap()
1881 .iter()
1882 .find(|status| status.id == id)
1883 .cloned()
1884 .ok_or_else(|| {
1885 IntegrationError::new(
1886 "trigger",
1887 IntegrationErrorKind::InvalidInput,
1888 "job not found",
1889 )
1890 })
1891 }
1892
1893 fn poll(
1894 &self,
1895 id: &str,
1896 _attempts: u32,
1897 _backoff_ms: u64,
1898 ) -> super::IntegrationResult<JobStatus> {
1899 self.status(id)
1900 }
1901
1902 fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
1903 self.callbacks.lock().unwrap().push(callback);
1904 }
1905 }
1906
1907 #[test]
1908 fn job_orchestration_contract_supports_enqueue_and_status() {
1909 let jobs = InMemoryJobs::default();
1910 let handle = jobs
1911 .enqueue(JobRequest::new(
1912 "sync_customer",
1913 json!({"id": 42}),
1914 "idempotent-42",
1915 ))
1916 .unwrap();
1917 let status = jobs.status(&handle.id).unwrap();
1918 assert_eq!(status.state, JobState::Queued);
1919 assert_eq!(handle.idempotency_key, "idempotent-42");
1920
1921 let ctx = QueryContext::default().with_tenant_id("tenant-a");
1922 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
1923 }
1924
1925 #[test]
1926 fn reference_singlestore_adapter_runs_typed_sql_boundary() {
1927 let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
1928 "region".to_string(),
1929 Value::String("EMEA".to_string()),
1930 )])]);
1931 let rows = adapter
1932 .run_query(
1933 SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1934 &QueryContext::default(),
1935 )
1936 .unwrap();
1937 assert_eq!(rows.len(), 1);
1938 assert_eq!(
1939 rows[0].get("region"),
1940 Some(&Value::String("EMEA".to_string()))
1941 );
1942 }
1943
1944 #[test]
1945 fn reference_clickhouse_adapter_runs_typed_sql_boundary() {
1946 let adapter = InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
1947 "events".to_string(),
1948 Value::Number(128.into()),
1949 )])]);
1950 let rows = adapter
1951 .run_query(
1952 SqlCommand::new("SELECT events FROM account_events", Vec::new()),
1953 &QueryContext::default(),
1954 )
1955 .unwrap();
1956 assert_eq!(rows.len(), 1);
1957 assert_eq!(rows[0].get("events"), Some(&Value::Number(128.into())));
1958 }
1959
1960 #[test]
1961 fn reference_bigquery_adapter_runs_typed_sql_boundary() {
1962 let adapter = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
1963 "active_accounts".to_string(),
1964 Value::Number(64.into()),
1965 )])]);
1966 let rows = adapter
1967 .run_query(
1968 SqlCommand::new("SELECT active_accounts FROM analytics_rollup", Vec::new()),
1969 &QueryContext::default(),
1970 )
1971 .unwrap();
1972 assert_eq!(rows.len(), 1);
1973 assert_eq!(
1974 rows[0].get("active_accounts"),
1975 Some(&Value::Number(64.into()))
1976 );
1977 }
1978
1979 #[test]
1980 fn reference_opensearch_adapter_filters_rows() {
1981 let rows = vec![
1982 crate::StoredRow {
1983 id: 1,
1984 data: std::collections::BTreeMap::from([(
1985 "title".to_string(),
1986 Value::String("Acme renewal".to_string()),
1987 )]),
1988 },
1989 crate::StoredRow {
1990 id: 2,
1991 data: std::collections::BTreeMap::from([(
1992 "title".to_string(),
1993 Value::String("Globex onboarding".to_string()),
1994 )]),
1995 },
1996 ];
1997 let adapter = InMemoryOpenSearchAdapter::new(rows);
1998 let response = adapter
1999 .search(
2000 SearchRequest::new("accounts", "renewal"),
2001 &QueryContext::default(),
2002 )
2003 .unwrap();
2004 assert_eq!(response.total_hits, 1);
2005 assert_eq!(response.rows[0].id, 1);
2006 }
2007
2008 #[test]
2009 fn high_volume_window_query_profiles_support_compact_payloads() {
2010 let sql_rows = vec![
2011 std::collections::BTreeMap::from([
2012 ("tenant".to_string(), Value::String("north".to_string())),
2013 ("arr".to_string(), Value::Number(120.into())),
2014 ]),
2015 std::collections::BTreeMap::from([
2016 ("tenant".to_string(), Value::String("south".to_string())),
2017 ("arr".to_string(), Value::Number(210.into())),
2018 ]),
2019 std::collections::BTreeMap::from([
2020 ("tenant".to_string(), Value::String("west".to_string())),
2021 ("arr".to_string(), Value::Number(330.into())),
2022 ]),
2023 std::collections::BTreeMap::from([
2024 ("tenant".to_string(), Value::String("east".to_string())),
2025 ("arr".to_string(), Value::Number(480.into())),
2026 ]),
2027 ];
2028 let context = QueryContext::default();
2029 let request = DataWindowRequest::new("tenant_rollup", 1, 2)
2030 .with_query_token("q:m51")
2031 .compact();
2032
2033 let singlestore = InMemorySingleStoreAdapter::new(sql_rows.clone());
2034 let clickhouse = InMemoryClickHouseAdapter::new(sql_rows.clone());
2035 let bigquery = InMemoryBigQueryAdapter::new(sql_rows.clone());
2036
2037 for response in [
2038 singlestore
2039 .run_high_volume_window_query(
2040 SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2041 request.clone(),
2042 &context,
2043 )
2044 .expect("singlestore window query"),
2045 clickhouse
2046 .run_high_volume_window_query(
2047 SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2048 request.clone(),
2049 &context,
2050 )
2051 .expect("clickhouse window query"),
2052 bigquery
2053 .run_high_volume_window_query(
2054 SqlCommand::new("SELECT * FROM tenant_rollup", Vec::new()),
2055 request.clone(),
2056 &context,
2057 )
2058 .expect("bigquery window query"),
2059 ] {
2060 assert_eq!(response.dataset, "tenant_rollup");
2061 assert_eq!(response.rows.len(), 2);
2062 assert_eq!(response.total_rows, 4);
2063 assert_eq!(response.query_token, "q:m51");
2064 let compact = response.compact_rows.expect("compact payload");
2065 assert!(compact.columns.contains(&"tenant".to_string()));
2066 assert!(compact.columns.contains(&"arr".to_string()));
2067 assert_eq!(compact.rows.len(), response.rows.len());
2068 }
2069 }
2070
2071 #[test]
2072 fn opensearch_window_search_uses_window_contract() {
2073 let rows = vec![
2074 crate::StoredRow {
2075 id: 1,
2076 data: std::collections::BTreeMap::from([
2077 ("tenant".to_string(), Value::String("acme".to_string())),
2078 ("status".to_string(), Value::String("healthy".to_string())),
2079 ]),
2080 },
2081 crate::StoredRow {
2082 id: 2,
2083 data: std::collections::BTreeMap::from([
2084 ("tenant".to_string(), Value::String("globex".to_string())),
2085 ("status".to_string(), Value::String("renewal".to_string())),
2086 ]),
2087 },
2088 crate::StoredRow {
2089 id: 3,
2090 data: std::collections::BTreeMap::from([
2091 ("tenant".to_string(), Value::String("initech".to_string())),
2092 ("status".to_string(), Value::String("at-risk".to_string())),
2093 ]),
2094 },
2095 ];
2096 let adapter = InMemoryOpenSearchAdapter::new(rows);
2097 let response = adapter
2098 .search_window(
2099 SearchRequest::new("accounts", ""),
2100 DataWindowRequest::new("accounts", 1, 2).compact(),
2101 &QueryContext::default(),
2102 )
2103 .expect("opensearch window search");
2104
2105 assert_eq!(response.dataset, "accounts");
2106 assert_eq!(response.offset, 1);
2107 assert_eq!(response.limit, 2);
2108 assert_eq!(response.total_rows, 3);
2109 assert_eq!(response.rows.len(), 2);
2110 assert!(response.compact_rows.is_some());
2111 assert!(!response.window_token.is_empty());
2112 }
2113
2114 #[test]
2115 fn reference_axiom_sink_records_events() {
2116 let sink = InMemoryAxiomSink::default();
2117 sink.send_event(AnalyticsEvent::new(
2118 "sales",
2119 "query_executed",
2120 json!({"latency_ms": 12}),
2121 ))
2122 .unwrap();
2123 let events = sink.events();
2124 assert_eq!(events.len(), 1);
2125 assert_eq!(events[0].name, "query_executed");
2126 }
2127
2128 #[test]
2129 fn reference_trigger_orchestrator_supports_completion_and_polling() {
2130 let orchestrator = InMemoryJobOrchestrator::default();
2131 let completed = Arc::new(Mutex::new(false));
2132 let completed_flag = completed.clone();
2133 orchestrator.register_completion_callback(Arc::new(move |status| {
2134 if status.state == JobState::Succeeded {
2135 if let Ok(mut guard) = completed_flag.lock() {
2136 *guard = true;
2137 }
2138 }
2139 }));
2140
2141 let handle = orchestrator
2142 .enqueue(JobRequest::new(
2143 "refresh_dashboard",
2144 json!({"account_id": 7}),
2145 "refresh-7",
2146 ))
2147 .unwrap();
2148 orchestrator
2149 .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
2150 .unwrap();
2151 let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
2152 assert_eq!(status.state, JobState::Succeeded);
2153 assert_eq!(
2154 status.result,
2155 Some(json!({
2156 "rows_synced": 18
2157 }))
2158 );
2159 assert!(*completed.lock().unwrap());
2160 }
2161
2162 #[test]
2163 fn query_context_helpers_encode_trace_correlation_retry_timeout() {
2164 let context = QueryContext::default()
2165 .with_correlation_id("corr-42")
2166 .with_request_id("req-42")
2167 .with_timeout_ms(900)
2168 .with_retry_policy(RetryPolicy {
2169 max_attempts: 4,
2170 initial_backoff_ms: 25,
2171 max_backoff_ms: 100,
2172 });
2173
2174 assert_eq!(
2175 context
2176 .tags
2177 .get(CONTEXT_TAG_CORRELATION_ID)
2178 .map(String::as_str),
2179 Some("corr-42")
2180 );
2181 assert_eq!(
2182 context.tags.get(CONTEXT_TAG_REQUEST_ID).map(String::as_str),
2183 Some("req-42")
2184 );
2185 assert_eq!(
2186 context.tags.get(CONTEXT_TAG_TIMEOUT_MS).map(String::as_str),
2187 Some("900")
2188 );
2189 assert_eq!(context.correlation_id(), Some("corr-42"));
2190 assert_eq!(context.request_id(), Some("req-42"));
2191 assert_eq!(context.timeout_ms(), Some(900));
2192 assert_eq!(
2193 context.retry_policy_override(),
2194 Some(RetryPolicy {
2195 max_attempts: 4,
2196 initial_backoff_ms: 25,
2197 max_backoff_ms: 100
2198 })
2199 );
2200 }
2201
2202 #[test]
2203 fn run_with_contract_respects_context_retry_override() {
2204 let context = QueryContext::default().with_retry_policy(RetryPolicy {
2205 max_attempts: 2,
2206 initial_backoff_ms: 0,
2207 max_backoff_ms: 0,
2208 });
2209 let mut attempts = 0u32;
2210 let result = run_with_contract(
2211 "opensearch",
2212 "search",
2213 AdapterCallContract::default()
2214 .with_retry_policy(RetryPolicy::never())
2215 .with_timeout_ms(1_000),
2216 &context,
2217 |attempt| {
2218 attempts = attempt;
2219 if attempt == 1 {
2220 Err(IntegrationError::new(
2221 "opensearch",
2222 IntegrationErrorKind::Transient,
2223 "temporary failure",
2224 ))
2225 } else {
2226 Ok("ok")
2227 }
2228 },
2229 )
2230 .unwrap();
2231 assert_eq!(result, "ok");
2232 assert_eq!(attempts, 2);
2233 }
2234
2235 #[tokio::test]
2236 async fn run_with_contract_async_respects_context_retry_override() {
2237 let context = QueryContext::default().with_retry_policy(RetryPolicy {
2238 max_attempts: 2,
2239 initial_backoff_ms: 0,
2240 max_backoff_ms: 0,
2241 });
2242 let attempts = Arc::new(Mutex::new(0u32));
2243 let attempts_for_closure = attempts.clone();
2244 let result = run_with_contract_async(
2245 "opensearch",
2246 "search_async",
2247 AdapterCallContract::default()
2248 .with_retry_policy(RetryPolicy::never())
2249 .with_timeout_ms(1_000),
2250 &context,
2251 move |attempt| {
2252 let attempts_for_step = attempts_for_closure.clone();
2253 async move {
2254 if let Ok(mut guard) = attempts_for_step.lock() {
2255 *guard = attempt;
2256 }
2257 if attempt == 1 {
2258 Err(IntegrationError::new(
2259 "opensearch",
2260 IntegrationErrorKind::Transient,
2261 "temporary failure",
2262 ))
2263 } else {
2264 Ok("ok")
2265 }
2266 }
2267 },
2268 )
2269 .await
2270 .unwrap();
2271 assert_eq!(result, "ok");
2272 assert_eq!(*attempts.lock().unwrap(), 2);
2273 }
2274
2275 #[tokio::test]
2276 async fn run_with_retry_async_retries_transient_errors() {
2277 let calls = Arc::new(Mutex::new(0u32));
2278 let calls_for_closure = calls.clone();
2279 let result = run_with_retry_async(RetryPolicy::conservative(), move |attempt| {
2280 let calls_for_step = calls_for_closure.clone();
2281 async move {
2282 if let Ok(mut guard) = calls_for_step.lock() {
2283 *guard = attempt;
2284 }
2285 if attempt < 3 {
2286 Err(IntegrationError::new(
2287 "bigquery",
2288 IntegrationErrorKind::Transient,
2289 "temporary failure",
2290 ))
2291 } else {
2292 Ok("ok")
2293 }
2294 }
2295 })
2296 .await
2297 .unwrap();
2298 assert_eq!(result, "ok");
2299 assert_eq!(*calls.lock().unwrap(), 3);
2300 }
2301
2302 #[test]
2303 fn run_with_contract_returns_timeout_error() {
2304 let context = QueryContext::default().with_timeout_ms(5);
2305 let err = run_with_contract(
2306 "singlestore",
2307 "run_query",
2308 AdapterCallContract::default(),
2309 &context,
2310 |_| {
2311 std::thread::sleep(std::time::Duration::from_millis(15));
2312 Ok::<_, IntegrationError>("done")
2313 },
2314 )
2315 .unwrap_err();
2316 assert_eq!(err.kind, IntegrationErrorKind::Timeout);
2317 assert_eq!(err.code.as_deref(), Some("operation_timeout"));
2318 }
2319
2320 #[test]
2321 fn axiom_bridge_enriches_events_with_trace_and_correlation() {
2322 let sink = Arc::new(InMemoryAxiomSink::default());
2323 let bridge = AxiomTelemetryBridge::new(sink.clone(), "runtime");
2324 let context = QueryContext {
2325 tenant_id: Some("tenant-a".to_string()),
2326 trace_id: Some("trace-123".to_string()),
2327 tags: std::collections::BTreeMap::new(),
2328 }
2329 .with_correlation_id("corr-9")
2330 .with_request_id("req-9");
2331 bridge
2332 .emit("session_event", json!({"event":"patch"}), &context)
2333 .unwrap();
2334 let events = sink.events();
2335 assert_eq!(events.len(), 1);
2336 let payload = events[0].payload.as_object().unwrap();
2337 assert_eq!(payload.get("trace_id"), Some(&json!("trace-123")));
2338 assert_eq!(payload.get("correlation_id"), Some(&json!("corr-9")));
2339 assert_eq!(payload.get("request_id"), Some(&json!("req-9")));
2340 }
2341
2342 #[test]
2343 fn conformance_suite_passes_with_reference_adapters() {
2344 let singlestore = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from(
2345 [("tenant".to_string(), Value::String("north".to_string()))],
2346 )]);
2347 let clickhouse =
2348 InMemoryClickHouseAdapter::new(vec![std::collections::BTreeMap::from([(
2349 "events".to_string(),
2350 Value::Number(99.into()),
2351 )])]);
2352 let bigquery = InMemoryBigQueryAdapter::new(vec![std::collections::BTreeMap::from([(
2353 "accounts".to_string(),
2354 Value::Number(33.into()),
2355 )])]);
2356 let opensearch = InMemoryOpenSearchAdapter::new(vec![crate::StoredRow {
2357 id: 1,
2358 data: std::collections::BTreeMap::from([(
2359 "title".to_string(),
2360 Value::String("Acme renewal".to_string()),
2361 )]),
2362 }]);
2363 let trigger = InMemoryJobOrchestrator::default();
2364 let analytics = InMemoryAxiomSink::default();
2365 let report = run_adapter_conformance_suite(
2366 &singlestore,
2367 &clickhouse,
2368 &bigquery,
2369 &opensearch,
2370 &trigger,
2371 &analytics,
2372 &QueryContext::default()
2373 .with_correlation_id("corr-1")
2374 .with_request_id("req-1"),
2375 );
2376 assert!(report.passed(), "report={report:?}");
2377 assert!(report.checks.len() >= 8);
2378 }
2379
2380 #[test]
2381 fn adapter_error_paths_cover_invalid_inputs_and_failure_reporting() {
2382 let context = QueryContext::default();
2383 let empty_sql = SqlCommand::new(" ", Vec::new());
2384 let singlestore = InMemorySingleStoreAdapter::default();
2385 let clickhouse = InMemoryClickHouseAdapter::default();
2386 let bigquery = InMemoryBigQueryAdapter::default();
2387 let opensearch = InMemoryOpenSearchAdapter::default();
2388
2389 for err in [
2390 singlestore
2391 .run_query(empty_sql.clone(), &context)
2392 .unwrap_err(),
2393 clickhouse.run_query(empty_sql.clone(), &context).unwrap_err(),
2394 bigquery.run_query(empty_sql, &context).unwrap_err(),
2395 ] {
2396 assert_eq!(err.kind, IntegrationErrorKind::InvalidInput);
2397 assert_eq!(err.code.as_deref(), Some("empty_statement"));
2398 }
2399
2400 let search_err = opensearch
2401 .search(SearchRequest::new(" ", "needle"), &context)
2402 .unwrap_err();
2403 assert_eq!(search_err.kind, IntegrationErrorKind::InvalidInput);
2404 assert_eq!(search_err.code.as_deref(), Some("empty_index"));
2405 }
2406
2407 struct FailingSqlAdapter {
2408 source: &'static str,
2409 }
2410
2411 impl TypedQueryBoundary for FailingSqlAdapter {
2412 type Request = SqlCommand;
2413 type Response = Vec<crate::Row>;
2414
2415 fn execute(
2416 &self,
2417 _request: &Self::Request,
2418 _context: &QueryContext,
2419 ) -> super::IntegrationResult<Self::Response> {
2420 Err(IntegrationError::new(
2421 self.source,
2422 IntegrationErrorKind::Unavailable,
2423 "backend unavailable",
2424 )
2425 .with_code("backend_down"))
2426 }
2427 }
2428
2429 impl SingleStoreAdapter for FailingSqlAdapter {}
2430 impl ClickHouseAdapter for FailingSqlAdapter {}
2431 impl BigQueryAdapter for FailingSqlAdapter {}
2432
2433 struct FailingSearchAdapter;
2434
2435 impl TypedQueryBoundary for FailingSearchAdapter {
2436 type Request = SearchRequest;
2437 type Response = SearchResponse;
2438
2439 fn execute(
2440 &self,
2441 _request: &Self::Request,
2442 _context: &QueryContext,
2443 ) -> super::IntegrationResult<Self::Response> {
2444 Err(IntegrationError::new(
2445 "opensearch",
2446 IntegrationErrorKind::Unavailable,
2447 "search unavailable",
2448 ))
2449 }
2450 }
2451
2452 impl OpenSearchAdapter for FailingSearchAdapter {}
2453
2454 struct FailingTrigger;
2455
2456 impl TriggerDevAdapter for FailingTrigger {
2457 fn trigger_workflow(&self, _request: JobRequest) -> super::IntegrationResult<JobHandle> {
2458 Err(IntegrationError::new(
2459 "trigger",
2460 IntegrationErrorKind::Unavailable,
2461 "trigger unavailable",
2462 ))
2463 }
2464
2465 fn workflow_status(&self, _id: &str) -> super::IntegrationResult<JobStatus> {
2466 Err(IntegrationError::new(
2467 "trigger",
2468 IntegrationErrorKind::Unavailable,
2469 "trigger unavailable",
2470 ))
2471 }
2472
2473 fn poll_workflow(
2474 &self,
2475 _id: &str,
2476 _attempts: u32,
2477 _backoff_ms: u64,
2478 ) -> super::IntegrationResult<JobStatus> {
2479 Err(IntegrationError::new(
2480 "trigger",
2481 IntegrationErrorKind::Unavailable,
2482 "trigger unavailable",
2483 ))
2484 }
2485 }
2486
2487 #[derive(Default)]
2488 struct FailingAnalyticsSink;
2489
2490 impl AnalyticsSink for FailingAnalyticsSink {
2491 fn send_event(&self, _event: AnalyticsEvent) -> super::IntegrationResult<()> {
2492 Err(IntegrationError::new(
2493 "analytics",
2494 IntegrationErrorKind::Unavailable,
2495 "sink unavailable",
2496 ))
2497 }
2498 }
2499
2500 #[test]
2501 fn conformance_suite_captures_failures_for_unavailable_dependencies() {
2502 let failing_sql = FailingSqlAdapter { source: "sql" };
2503 let failing_search = FailingSearchAdapter;
2504 let failing_trigger = FailingTrigger;
2505 let failing_analytics = FailingAnalyticsSink;
2506
2507 let report = run_adapter_conformance_suite(
2508 &failing_sql,
2509 &failing_sql,
2510 &failing_sql,
2511 &failing_search,
2512 &failing_trigger,
2513 &failing_analytics,
2514 &QueryContext::default(),
2515 );
2516
2517 assert!(!report.passed());
2518 assert!(report.checks.iter().any(|check| {
2519 check.name == "singlestore.query" && !check.passed
2520 }));
2521 assert!(report.checks.iter().any(|check| {
2522 check.name == "opensearch.search" && !check.passed
2523 }));
2524 assert!(report.checks.iter().any(|check| {
2525 check.name == "trigger.enqueue" && !check.passed
2526 }));
2527 assert!(report.checks.iter().any(|check| {
2528 check.name == "analytics.emit" && !check.passed
2529 }));
2530 assert!(report.checks.iter().any(|check| {
2531 check.name == "context.correlation" && !check.passed
2532 }));
2533 }
2534}