1use crate::{
2 error::{DataError, DataResult},
3 query::Query,
4 repo::{Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt,
11 sync::{Arc, Mutex},
12 thread,
13 time::{Duration, Instant},
14};
15use tracing::{info, warn};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum IntegrationErrorKind {
20 Transient,
21 Permanent,
22 Auth,
23 RateLimited,
24 Timeout,
25 Unavailable,
26 InvalidInput,
27}
28
29#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
30pub struct IntegrationError {
31 pub source: String,
32 pub kind: IntegrationErrorKind,
33 pub message: String,
34 pub code: Option<String>,
35 pub retryable: bool,
36}
37
38impl IntegrationError {
39 pub fn new(
40 source: impl Into<String>,
41 kind: IntegrationErrorKind,
42 message: impl Into<String>,
43 ) -> Self {
44 let kind_value = kind;
45 Self {
46 source: source.into(),
47 kind: kind_value,
48 message: message.into(),
49 code: None,
50 retryable: matches!(
51 kind_value,
52 IntegrationErrorKind::Transient
53 | IntegrationErrorKind::RateLimited
54 | IntegrationErrorKind::Timeout
55 | IntegrationErrorKind::Unavailable
56 ),
57 }
58 }
59
60 pub fn with_code(mut self, code: impl Into<String>) -> Self {
61 self.code = Some(code.into());
62 self
63 }
64}
65
66impl fmt::Display for IntegrationError {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 if let Some(code) = &self.code {
69 write!(f, "[{}:{}] {}", self.source, code, self.message)
70 } else {
71 write!(f, "[{}] {}", self.source, self.message)
72 }
73 }
74}
75
76impl std::error::Error for IntegrationError {}
77
78pub type IntegrationResult<T> = Result<T, IntegrationError>;
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
81pub struct RetryPolicy {
82 pub max_attempts: u32,
83 pub initial_backoff_ms: u64,
84 pub max_backoff_ms: u64,
85}
86
87impl RetryPolicy {
88 pub fn conservative() -> Self {
89 Self {
90 max_attempts: 3,
91 initial_backoff_ms: 50,
92 max_backoff_ms: 500,
93 }
94 }
95
96 pub fn never() -> Self {
97 Self {
98 max_attempts: 1,
99 initial_backoff_ms: 0,
100 max_backoff_ms: 0,
101 }
102 }
103}
104
105impl Default for RetryPolicy {
106 fn default() -> Self {
107 Self::conservative()
108 }
109}
110
111pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
112where
113 F: FnMut(u32) -> IntegrationResult<T>,
114{
115 let attempts = policy.max_attempts.max(1);
116 let mut backoff_ms = policy.initial_backoff_ms;
117
118 for attempt in 1..=attempts {
119 match operation(attempt) {
120 Ok(value) => return Ok(value),
121 Err(err) if !err.retryable || attempt == attempts => return Err(err),
122 Err(_) => {
123 if backoff_ms > 0 {
124 thread::sleep(Duration::from_millis(backoff_ms));
125 }
126 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
127 }
128 }
129 }
130
131 Err(IntegrationError::new(
132 "retry",
133 IntegrationErrorKind::Transient,
134 "retry exhausted",
135 ))
136}
137
138pub trait ConnectionLifecycleHook: Send + Sync {
139 fn on_connect(&self) -> IntegrationResult<()> {
140 Ok(())
141 }
142
143 fn on_disconnect(&self) -> IntegrationResult<()> {
144 Ok(())
145 }
146}
147
148pub trait ConnectionLifecycle {
149 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
150 fn connect(&self) -> IntegrationResult<()>;
151 fn disconnect(&self) -> IntegrationResult<()>;
152}
153
154#[derive(Default)]
155pub struct LifecycleHooks {
156 hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
157}
158
159impl LifecycleHooks {
160 pub fn new() -> Self {
161 Self::default()
162 }
163}
164
165impl ConnectionLifecycle for LifecycleHooks {
166 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
167 self.hooks.push(hook);
168 }
169
170 fn connect(&self) -> IntegrationResult<()> {
171 for hook in &self.hooks {
172 hook.on_connect()?;
173 }
174 Ok(())
175 }
176
177 fn disconnect(&self) -> IntegrationResult<()> {
178 for hook in &self.hooks {
179 hook.on_disconnect()?;
180 }
181 Ok(())
182 }
183}
184
185#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
186pub struct QueryContext {
187 pub tenant_id: Option<String>,
188 pub trace_id: Option<String>,
189 pub tags: BTreeMap<String, String>,
190}
191
192pub trait TypedQueryBoundary {
193 type Request: Send + Sync;
194 type Response: Send + Sync;
195
196 fn execute(
197 &self,
198 request: &Self::Request,
199 context: &QueryContext,
200 ) -> IntegrationResult<Self::Response>;
201}
202
203#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
204pub struct SqlCommand {
205 pub statement: String,
206 pub params: Vec<Value>,
207}
208
209impl SqlCommand {
210 pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
211 Self {
212 statement: statement.into(),
213 params,
214 }
215 }
216}
217
218pub trait SingleStoreAdapter:
219 TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
220{
221 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
222 let started_at = Instant::now();
223 let statement = query.statement.as_str();
224 let param_count = query.params.len();
225 let result = self.execute(&query, context);
226 match &result {
227 Ok(rows) => info!(
228 target: "shelly.integration.query",
229 source = "singlestore",
230 operation = "run_query",
231 tenant_id = ?context.tenant_id,
232 trace_id = ?context.trace_id,
233 tag_count = context.tags.len(),
234 statement,
235 param_count,
236 row_count = rows.len(),
237 duration_ms = started_at.elapsed().as_millis() as u64,
238 "Shelly integration query executed"
239 ),
240 Err(err) => warn!(
241 target: "shelly.integration.query",
242 source = "singlestore",
243 operation = "run_query",
244 tenant_id = ?context.tenant_id,
245 trace_id = ?context.trace_id,
246 tag_count = context.tags.len(),
247 statement,
248 param_count,
249 duration_ms = started_at.elapsed().as_millis() as u64,
250 error = %err,
251 "Shelly integration query failed"
252 ),
253 }
254 result
255 }
256}
257
258#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
259pub struct SearchRequest {
260 pub index: String,
261 pub text: String,
262 pub filters: Vec<crate::query::Filter>,
263 pub page: usize,
264 pub per_page: usize,
265}
266
267impl SearchRequest {
268 pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
269 Self {
270 index: index.into(),
271 text: text.into(),
272 filters: Vec::new(),
273 page: 1,
274 per_page: 25,
275 }
276 }
277
278 pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
279 self.filters.push(filter);
280 self
281 }
282}
283
284#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
285pub struct SearchResponse {
286 pub total_hits: usize,
287 pub rows: Vec<StoredRow>,
288}
289
290pub trait OpenSearchAdapter:
291 TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
292{
293 fn search(
294 &self,
295 request: SearchRequest,
296 context: &QueryContext,
297 ) -> IntegrationResult<SearchResponse> {
298 let started_at = Instant::now();
299 let index = request.index.as_str();
300 let text = request.text.as_str();
301 let filter_count = request.filters.len();
302 let page = request.page;
303 let per_page = request.per_page;
304 let result = self.execute(&request, context);
305 match &result {
306 Ok(response) => info!(
307 target: "shelly.integration.query",
308 source = "opensearch",
309 operation = "search",
310 tenant_id = ?context.tenant_id,
311 trace_id = ?context.trace_id,
312 tag_count = context.tags.len(),
313 index,
314 text,
315 filter_count,
316 page,
317 per_page,
318 row_count = response.rows.len(),
319 total_hits = response.total_hits,
320 duration_ms = started_at.elapsed().as_millis() as u64,
321 "Shelly integration query executed"
322 ),
323 Err(err) => warn!(
324 target: "shelly.integration.query",
325 source = "opensearch",
326 operation = "search",
327 tenant_id = ?context.tenant_id,
328 trace_id = ?context.trace_id,
329 tag_count = context.tags.len(),
330 index,
331 text,
332 filter_count,
333 page,
334 per_page,
335 duration_ms = started_at.elapsed().as_millis() as u64,
336 error = %err,
337 "Shelly integration query failed"
338 ),
339 }
340 result
341 }
342}
343
344pub trait AnalyticsSink: Send + Sync {
345 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
346}
347
348#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349pub struct AnalyticsEvent {
350 pub namespace: String,
351 pub name: String,
352 pub payload: Value,
353}
354
355impl AnalyticsEvent {
356 pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
357 Self {
358 namespace: namespace.into(),
359 name: name.into(),
360 payload,
361 }
362 }
363}
364
365#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
366pub struct JobRequest {
367 pub workflow: String,
368 pub payload: Value,
369 pub idempotency_key: String,
370 pub metadata: BTreeMap<String, String>,
371}
372
373impl JobRequest {
374 pub fn new(
375 workflow: impl Into<String>,
376 payload: Value,
377 idempotency_key: impl Into<String>,
378 ) -> Self {
379 Self {
380 workflow: workflow.into(),
381 payload,
382 idempotency_key: idempotency_key.into(),
383 metadata: BTreeMap::new(),
384 }
385 }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
389#[serde(rename_all = "snake_case")]
390pub enum JobState {
391 Queued,
392 Running,
393 Succeeded,
394 Failed,
395 Canceled,
396}
397
398#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
399pub struct JobStatus {
400 pub id: String,
401 pub state: JobState,
402 pub attempts: u32,
403 pub result: Option<Value>,
404 pub error: Option<IntegrationError>,
405}
406
407#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
408pub struct JobHandle {
409 pub id: String,
410 pub workflow: String,
411 pub idempotency_key: String,
412}
413
414pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
415
416pub trait JobOrchestrator: Send + Sync {
417 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
418 fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
419 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
420 fn register_completion_callback(&self, callback: JobCompletionCallback);
421}
422
423#[derive(Debug, Clone, Default)]
424pub struct InMemorySingleStoreAdapter {
425 rows: Vec<Row>,
426}
427
428impl InMemorySingleStoreAdapter {
429 pub fn new(rows: Vec<Row>) -> Self {
430 Self { rows }
431 }
432}
433
434impl TypedQueryBoundary for InMemorySingleStoreAdapter {
435 type Request = SqlCommand;
436 type Response = Vec<Row>;
437
438 fn execute(
439 &self,
440 request: &Self::Request,
441 _context: &QueryContext,
442 ) -> IntegrationResult<Self::Response> {
443 if request.statement.trim().is_empty() {
444 return Err(IntegrationError::new(
445 "singlestore",
446 IntegrationErrorKind::InvalidInput,
447 "empty SQL statement",
448 )
449 .with_code("empty_statement"));
450 }
451 Ok(self.rows.clone())
452 }
453}
454
455impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
456
457#[derive(Debug, Clone, Default)]
458pub struct InMemoryOpenSearchAdapter {
459 rows: Vec<StoredRow>,
460}
461
462impl InMemoryOpenSearchAdapter {
463 pub fn new(rows: Vec<StoredRow>) -> Self {
464 Self { rows }
465 }
466}
467
468impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
469 type Request = SearchRequest;
470 type Response = SearchResponse;
471
472 fn execute(
473 &self,
474 request: &Self::Request,
475 _context: &QueryContext,
476 ) -> IntegrationResult<Self::Response> {
477 if request.index.trim().is_empty() {
478 return Err(IntegrationError::new(
479 "opensearch",
480 IntegrationErrorKind::InvalidInput,
481 "search index must not be empty",
482 )
483 .with_code("empty_index"));
484 }
485 let needle = request.text.trim().to_lowercase();
486 let mut filtered = self
487 .rows
488 .iter()
489 .filter(|row| {
490 if needle.is_empty() {
491 return true;
492 }
493 row.data.values().any(|value| {
494 value
495 .as_str()
496 .map(|text| text.to_lowercase().contains(&needle))
497 .unwrap_or(false)
498 })
499 })
500 .cloned()
501 .collect::<Vec<_>>();
502 let total_hits = filtered.len();
503 let page = request.page.max(1);
504 let per_page = request.per_page.max(1);
505 let offset = (page - 1) * per_page;
506 filtered = filtered.into_iter().skip(offset).take(per_page).collect();
507 Ok(SearchResponse {
508 total_hits,
509 rows: filtered,
510 })
511 }
512}
513
514impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
515
516#[derive(Debug, Default, Clone)]
517pub struct InMemoryAxiomSink {
518 events: Arc<Mutex<Vec<AnalyticsEvent>>>,
519}
520
521impl InMemoryAxiomSink {
522 pub fn events(&self) -> Vec<AnalyticsEvent> {
523 self.events
524 .lock()
525 .map(|events| events.clone())
526 .unwrap_or_default()
527 }
528}
529
530impl AnalyticsSink for InMemoryAxiomSink {
531 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
532 let started_at = Instant::now();
533 let namespace = event.namespace.clone();
534 let name = event.name.clone();
535 self.events
536 .lock()
537 .map_err(|_| {
538 IntegrationError::new(
539 "axiom",
540 IntegrationErrorKind::Unavailable,
541 "analytics sink lock poisoned",
542 )
543 })?
544 .push(event);
545 info!(
546 target: "shelly.integration.query",
547 source = "axiom",
548 operation = "send_event",
549 namespace,
550 event_name = name,
551 duration_ms = started_at.elapsed().as_millis() as u64,
552 "Shelly integration call executed"
553 );
554 Ok(())
555 }
556}
557
558#[derive(Default)]
559pub struct InMemoryJobOrchestrator {
560 statuses: Mutex<HashMap<String, JobStatus>>,
561 callbacks: Mutex<Vec<JobCompletionCallback>>,
562 next_id: Mutex<u64>,
563}
564
565impl InMemoryJobOrchestrator {
566 pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
567 let status = self.with_status_mut(id, |status| {
568 status.state = JobState::Succeeded;
569 status.result = Some(result);
570 status.error = None;
571 })?;
572 self.notify(&status);
573 Ok(())
574 }
575
576 pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
577 let status = self.with_status_mut(id, |status| {
578 status.state = JobState::Failed;
579 status.result = None;
580 status.error = Some(error);
581 })?;
582 self.notify(&status);
583 Ok(())
584 }
585
586 fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
587 where
588 F: FnOnce(&mut JobStatus),
589 {
590 let mut statuses = self.statuses.lock().map_err(|_| {
591 IntegrationError::new(
592 "trigger",
593 IntegrationErrorKind::Unavailable,
594 "job status lock poisoned",
595 )
596 })?;
597 let Some(status) = statuses.get_mut(id) else {
598 return Err(IntegrationError::new(
599 "trigger",
600 IntegrationErrorKind::InvalidInput,
601 "job not found",
602 )
603 .with_code("job_not_found"));
604 };
605 status.attempts = status.attempts.saturating_add(1);
606 update(status);
607 Ok(status.clone())
608 }
609
610 fn notify(&self, status: &JobStatus) {
611 if let Ok(callbacks) = self.callbacks.lock() {
612 for callback in callbacks.iter() {
613 callback(status);
614 }
615 }
616 }
617
618 fn is_terminal(state: JobState) -> bool {
619 matches!(
620 state,
621 JobState::Succeeded | JobState::Failed | JobState::Canceled
622 )
623 }
624}
625
626impl JobOrchestrator for InMemoryJobOrchestrator {
627 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
628 let started_at = Instant::now();
629 let workflow = request.workflow.clone();
630 let idempotency_key = request.idempotency_key.clone();
631 let mut next_id = self.next_id.lock().map_err(|_| {
632 IntegrationError::new(
633 "trigger",
634 IntegrationErrorKind::Unavailable,
635 "job id lock poisoned",
636 )
637 })?;
638 *next_id = next_id.saturating_add(1);
639 let id = format!("job-{next_id}");
640 let status = JobStatus {
641 id: id.clone(),
642 state: JobState::Queued,
643 attempts: 0,
644 result: None,
645 error: None,
646 };
647 self.statuses
648 .lock()
649 .map_err(|_| {
650 IntegrationError::new(
651 "trigger",
652 IntegrationErrorKind::Unavailable,
653 "job status lock poisoned",
654 )
655 })?
656 .insert(id.clone(), status);
657 let handle = JobHandle {
658 id,
659 workflow: request.workflow,
660 idempotency_key: request.idempotency_key,
661 };
662 info!(
663 target: "shelly.integration.query",
664 source = "trigger",
665 operation = "enqueue",
666 workflow,
667 idempotency_key,
668 job_id = handle.id.as_str(),
669 duration_ms = started_at.elapsed().as_millis() as u64,
670 "Shelly integration call executed"
671 );
672 Ok(handle)
673 }
674
675 fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
676 let started_at = Instant::now();
677 let result = self
678 .statuses
679 .lock()
680 .map_err(|_| {
681 IntegrationError::new(
682 "trigger",
683 IntegrationErrorKind::Unavailable,
684 "job status lock poisoned",
685 )
686 })?
687 .get(id)
688 .cloned()
689 .ok_or_else(|| {
690 IntegrationError::new(
691 "trigger",
692 IntegrationErrorKind::InvalidInput,
693 "job not found",
694 )
695 .with_code("job_not_found")
696 });
697 match &result {
698 Ok(status) => info!(
699 target: "shelly.integration.query",
700 source = "trigger",
701 operation = "status",
702 job_id = id,
703 job_state = ?status.state,
704 attempts = status.attempts,
705 duration_ms = started_at.elapsed().as_millis() as u64,
706 "Shelly integration call executed"
707 ),
708 Err(err) => warn!(
709 target: "shelly.integration.query",
710 source = "trigger",
711 operation = "status",
712 job_id = id,
713 duration_ms = started_at.elapsed().as_millis() as u64,
714 error = %err,
715 "Shelly integration call failed"
716 ),
717 }
718 result
719 }
720
721 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
722 let started_at = Instant::now();
723 let attempts = attempts.max(1);
724 let mut polled = 0u32;
725 for current in 1..=attempts {
726 let status = self.status(id)?;
727 polled = current;
728 if Self::is_terminal(status.state) || current == attempts {
729 info!(
730 target: "shelly.integration.query",
731 source = "trigger",
732 operation = "poll",
733 job_id = id,
734 attempts = polled,
735 backoff_ms,
736 terminal = Self::is_terminal(status.state),
737 job_state = ?status.state,
738 duration_ms = started_at.elapsed().as_millis() as u64,
739 "Shelly integration call executed"
740 );
741 return Ok(status);
742 }
743 if backoff_ms > 0 {
744 thread::sleep(Duration::from_millis(backoff_ms));
745 }
746 }
747
748 let result = self.status(id);
749 match &result {
750 Ok(status) => info!(
751 target: "shelly.integration.query",
752 source = "trigger",
753 operation = "poll",
754 job_id = id,
755 attempts = polled,
756 backoff_ms,
757 terminal = Self::is_terminal(status.state),
758 job_state = ?status.state,
759 duration_ms = started_at.elapsed().as_millis() as u64,
760 "Shelly integration call executed"
761 ),
762 Err(err) => warn!(
763 target: "shelly.integration.query",
764 source = "trigger",
765 operation = "poll",
766 job_id = id,
767 attempts = polled,
768 backoff_ms,
769 duration_ms = started_at.elapsed().as_millis() as u64,
770 error = %err,
771 "Shelly integration call failed"
772 ),
773 }
774 result
775 }
776
777 fn register_completion_callback(&self, callback: JobCompletionCallback) {
778 if let Ok(mut callbacks) = self.callbacks.lock() {
779 callbacks.push(callback);
780 info!(
781 target: "shelly.integration.query",
782 source = "trigger",
783 operation = "register_completion_callback",
784 callback_count = callbacks.len(),
785 "Shelly integration callback registered"
786 );
787 } else {
788 warn!(
789 target: "shelly.integration.query",
790 source = "trigger",
791 operation = "register_completion_callback",
792 "Shelly integration callback registration failed"
793 );
794 }
795 }
796}
797
798pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
799 DataError::Integration(format!("[{}] {}", source.into(), err))
800}
801
802pub fn map_integration_result<T>(
803 source: impl Into<String>,
804 result: IntegrationResult<T>,
805) -> DataResult<T> {
806 result.map_err(|err| map_integration_error(source, err))
807}
808
809pub fn query_from_search(request: &SearchRequest) -> Query {
810 let mut query = Query::new().paginate(request.page, request.per_page);
811 for filter in &request.filters {
812 query = query.where_filter(filter.clone());
813 }
814 query
815}
816
817#[cfg(test)]
818mod tests {
819 use super::{
820 map_integration_result, run_with_retry, AnalyticsEvent, AnalyticsSink, ConnectionLifecycle,
821 ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
822 InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
823 IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
824 LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
825 SingleStoreAdapter, SqlCommand,
826 };
827 use serde_json::{json, Value};
828 use std::sync::{Arc, Mutex};
829
830 struct CountingHook {
831 connects: Arc<Mutex<u32>>,
832 disconnects: Arc<Mutex<u32>>,
833 }
834
835 impl ConnectionLifecycleHook for CountingHook {
836 fn on_connect(&self) -> super::IntegrationResult<()> {
837 let mut guard = self.connects.lock().unwrap();
838 *guard += 1;
839 Ok(())
840 }
841
842 fn on_disconnect(&self) -> super::IntegrationResult<()> {
843 let mut guard = self.disconnects.lock().unwrap();
844 *guard += 1;
845 Ok(())
846 }
847 }
848
849 #[test]
850 fn lifecycle_hooks_are_called() {
851 let connects = Arc::new(Mutex::new(0));
852 let disconnects = Arc::new(Mutex::new(0));
853 let mut lifecycle = LifecycleHooks::new();
854 lifecycle.register_hook(Arc::new(CountingHook {
855 connects: connects.clone(),
856 disconnects: disconnects.clone(),
857 }));
858
859 lifecycle.connect().unwrap();
860 lifecycle.disconnect().unwrap();
861 assert_eq!(*connects.lock().unwrap(), 1);
862 assert_eq!(*disconnects.lock().unwrap(), 1);
863 }
864
865 #[test]
866 fn retry_policy_retries_transient_errors() {
867 let mut calls = 0u32;
868 let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
869 calls = attempt;
870 if attempt < 3 {
871 Err(IntegrationError::new(
872 "opensearch",
873 IntegrationErrorKind::Transient,
874 "temporary failure",
875 ))
876 } else {
877 Ok("ok")
878 }
879 })
880 .unwrap();
881
882 assert_eq!(result, "ok");
883 assert_eq!(calls, 3);
884 }
885
886 #[test]
887 fn retry_policy_stops_on_permanent_errors() {
888 let mut calls = 0u32;
889 let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
890 calls = attempt;
891 Err::<(), IntegrationError>(IntegrationError::new(
892 "singlestore",
893 IntegrationErrorKind::Permanent,
894 "invalid sql",
895 ))
896 })
897 .unwrap_err();
898
899 assert_eq!(calls, 1);
900 assert_eq!(err.kind, IntegrationErrorKind::Permanent);
901 }
902
903 #[test]
904 fn integration_result_maps_into_data_error() {
905 let mapped = map_integration_result::<()>(
906 "trigger",
907 Err(IntegrationError::new(
908 "trigger",
909 IntegrationErrorKind::Unavailable,
910 "service unavailable",
911 )),
912 )
913 .unwrap_err();
914
915 assert!(mapped.to_string().contains("service unavailable"));
916 }
917
918 #[derive(Default)]
919 struct InMemoryJobs {
920 statuses: Arc<Mutex<Vec<JobStatus>>>,
921 callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
922 }
923
924 impl JobOrchestrator for InMemoryJobs {
925 fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
926 let id = format!("job-{}", request.idempotency_key);
927 self.statuses.lock().unwrap().push(JobStatus {
928 id: id.clone(),
929 state: JobState::Queued,
930 attempts: 0,
931 result: None,
932 error: None,
933 });
934 Ok(JobHandle {
935 id,
936 workflow: request.workflow,
937 idempotency_key: request.idempotency_key,
938 })
939 }
940
941 fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
942 self.statuses
943 .lock()
944 .unwrap()
945 .iter()
946 .find(|status| status.id == id)
947 .cloned()
948 .ok_or_else(|| {
949 IntegrationError::new(
950 "trigger",
951 IntegrationErrorKind::InvalidInput,
952 "job not found",
953 )
954 })
955 }
956
957 fn poll(
958 &self,
959 id: &str,
960 _attempts: u32,
961 _backoff_ms: u64,
962 ) -> super::IntegrationResult<JobStatus> {
963 self.status(id)
964 }
965
966 fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
967 self.callbacks.lock().unwrap().push(callback);
968 }
969 }
970
971 #[test]
972 fn job_orchestration_contract_supports_enqueue_and_status() {
973 let jobs = InMemoryJobs::default();
974 let handle = jobs
975 .enqueue(JobRequest::new(
976 "sync_customer",
977 json!({"id": 42}),
978 "idempotent-42",
979 ))
980 .unwrap();
981 let status = jobs.status(&handle.id).unwrap();
982 assert_eq!(status.state, JobState::Queued);
983 assert_eq!(handle.idempotency_key, "idempotent-42");
984
985 let ctx = QueryContext {
986 tenant_id: Some("tenant-a".to_string()),
987 ..QueryContext::default()
988 };
989 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
990 }
991
992 #[test]
993 fn reference_singlestore_adapter_runs_typed_sql_boundary() {
994 let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
995 "region".to_string(),
996 Value::String("EMEA".to_string()),
997 )])]);
998 let rows = adapter
999 .run_query(
1000 SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1001 &QueryContext::default(),
1002 )
1003 .unwrap();
1004 assert_eq!(rows.len(), 1);
1005 assert_eq!(
1006 rows[0].get("region"),
1007 Some(&Value::String("EMEA".to_string()))
1008 );
1009 }
1010
1011 #[test]
1012 fn reference_opensearch_adapter_filters_rows() {
1013 let rows = vec![
1014 crate::StoredRow {
1015 id: 1,
1016 data: std::collections::BTreeMap::from([(
1017 "title".to_string(),
1018 Value::String("Acme renewal".to_string()),
1019 )]),
1020 },
1021 crate::StoredRow {
1022 id: 2,
1023 data: std::collections::BTreeMap::from([(
1024 "title".to_string(),
1025 Value::String("Globex onboarding".to_string()),
1026 )]),
1027 },
1028 ];
1029 let adapter = InMemoryOpenSearchAdapter::new(rows);
1030 let response = adapter
1031 .search(
1032 SearchRequest::new("accounts", "renewal"),
1033 &QueryContext::default(),
1034 )
1035 .unwrap();
1036 assert_eq!(response.total_hits, 1);
1037 assert_eq!(response.rows[0].id, 1);
1038 }
1039
1040 #[test]
1041 fn reference_axiom_sink_records_events() {
1042 let sink = InMemoryAxiomSink::default();
1043 sink.send_event(AnalyticsEvent::new(
1044 "sales",
1045 "query_executed",
1046 json!({"latency_ms": 12}),
1047 ))
1048 .unwrap();
1049 let events = sink.events();
1050 assert_eq!(events.len(), 1);
1051 assert_eq!(events[0].name, "query_executed");
1052 }
1053
1054 #[test]
1055 fn reference_trigger_orchestrator_supports_completion_and_polling() {
1056 let orchestrator = InMemoryJobOrchestrator::default();
1057 let completed = Arc::new(Mutex::new(false));
1058 let completed_flag = completed.clone();
1059 orchestrator.register_completion_callback(Arc::new(move |status| {
1060 if status.state == JobState::Succeeded {
1061 if let Ok(mut guard) = completed_flag.lock() {
1062 *guard = true;
1063 }
1064 }
1065 }));
1066
1067 let handle = orchestrator
1068 .enqueue(JobRequest::new(
1069 "refresh_dashboard",
1070 json!({"account_id": 7}),
1071 "refresh-7",
1072 ))
1073 .unwrap();
1074 orchestrator
1075 .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
1076 .unwrap();
1077 let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
1078 assert_eq!(status.state, JobState::Succeeded);
1079 assert_eq!(
1080 status.result,
1081 Some(json!({
1082 "rows_synced": 18
1083 }))
1084 );
1085 assert!(*completed.lock().unwrap());
1086 }
1087}